427 lines
16 KiB
Python
427 lines
16 KiB
Python
"""
|
|
Plugin Transmission pour Lycostorrent.
|
|
Utilise l'API RPC de Transmission.
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import base64
|
|
import logging
|
|
from typing import Optional, List, Dict, Any
|
|
from .base import TorrentClientPlugin, TorrentClientConfig, TorrentInfo
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TransmissionPlugin(TorrentClientPlugin):
|
|
"""Plugin pour Transmission via son API RPC."""
|
|
|
|
PLUGIN_NAME = "Transmission"
|
|
PLUGIN_DESCRIPTION = "Client torrent Transmission (API RPC)"
|
|
PLUGIN_VERSION = "1.2.0"
|
|
PLUGIN_AUTHOR = "Lycostorrent"
|
|
|
|
# Transmission supporte les fichiers .torrent (avec téléchargement préalable)
|
|
SUPPORTS_TORRENT_FILES = True
|
|
|
|
# Mapping des statuts Transmission vers statuts génériques
|
|
# 0: stopped, 1: check pending, 2: checking, 3: download pending
|
|
# 4: downloading, 5: seed pending, 6: seeding
|
|
STATUS_MAP = {
|
|
0: 'paused',
|
|
1: 'checking',
|
|
2: 'checking',
|
|
3: 'queued',
|
|
4: 'downloading',
|
|
5: 'queued',
|
|
6: 'seeding'
|
|
}
|
|
|
|
def __init__(self, config: TorrentClientConfig):
|
|
super().__init__(config)
|
|
self._session = requests.Session()
|
|
self._session_id = None
|
|
|
|
# Support HTTP Basic Auth (pour les seedbox avec reverse proxy)
|
|
if config.username and config.password:
|
|
self._session.auth = (config.username, config.password)
|
|
|
|
@property
|
|
def rpc_url(self) -> str:
|
|
return f"{self.config.base_url}/transmission/rpc"
|
|
|
|
def _get_session_id(self) -> bool:
|
|
"""Récupère le X-Transmission-Session-Id requis par l'API."""
|
|
try:
|
|
response = self._session.post(self.rpc_url, timeout=10)
|
|
|
|
# Transmission retourne 409 avec le session ID dans les headers
|
|
if response.status_code == 409:
|
|
self._session_id = response.headers.get('X-Transmission-Session-Id')
|
|
if self._session_id:
|
|
self._session.headers['X-Transmission-Session-Id'] = self._session_id
|
|
return True
|
|
|
|
# Si on a déjà un session ID valide
|
|
if response.status_code == 200:
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Transmission: Erreur récupération session ID: {e}")
|
|
return False
|
|
|
|
def _rpc_call(self, method: str, arguments: Dict = None) -> Optional[Dict]:
|
|
"""Effectue un appel RPC à Transmission."""
|
|
if not self._session_id:
|
|
if not self._get_session_id():
|
|
return None
|
|
|
|
payload = {"method": method}
|
|
if arguments:
|
|
payload["arguments"] = arguments
|
|
|
|
try:
|
|
response = self._session.post(
|
|
self.rpc_url,
|
|
json=payload,
|
|
timeout=30
|
|
)
|
|
|
|
# Si session expirée, renouveler et réessayer
|
|
if response.status_code == 409:
|
|
self._session_id = response.headers.get('X-Transmission-Session-Id')
|
|
self._session.headers['X-Transmission-Session-Id'] = self._session_id
|
|
response = self._session.post(
|
|
self.rpc_url,
|
|
json=payload,
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
if data.get('result') == 'success':
|
|
return data.get('arguments', {})
|
|
else:
|
|
logger.error(f"❌ Transmission RPC error: {data.get('result')}")
|
|
return None
|
|
elif response.status_code == 401:
|
|
logger.error("❌ Transmission: Authentification requise")
|
|
return None
|
|
else:
|
|
logger.error(f"❌ Transmission: Erreur HTTP {response.status_code}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Transmission RPC: {e}")
|
|
return None
|
|
|
|
def connect(self) -> bool:
|
|
"""Connexion à Transmission via l'API RPC."""
|
|
try:
|
|
# Récupérer le session ID
|
|
if not self._get_session_id():
|
|
logger.error(f"❌ Transmission: Impossible de se connecter à {self.config.base_url}")
|
|
return False
|
|
|
|
# Tester avec un appel simple
|
|
result = self._rpc_call("session-get")
|
|
|
|
if result:
|
|
self._connected = True
|
|
version = result.get('version', 'unknown')
|
|
logger.info(f"✅ Connecté à Transmission {version}: {self.config.host}")
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
logger.error(f"❌ Transmission: Impossible de se connecter à {self.config.base_url}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"❌ Transmission: {e}")
|
|
return False
|
|
|
|
def disconnect(self) -> None:
|
|
"""Ferme la connexion avec Transmission."""
|
|
self._connected = False
|
|
self._session_id = None
|
|
|
|
def is_connected(self) -> bool:
|
|
"""Vérifie si la connexion est active."""
|
|
if not self._connected or not self._session_id:
|
|
return False
|
|
|
|
result = self._rpc_call("session-get")
|
|
return result is not None
|
|
|
|
def _ensure_connected(self) -> bool:
|
|
"""S'assure que la connexion est active, reconnecte si nécessaire."""
|
|
if not self._connected:
|
|
return self.connect()
|
|
return True
|
|
|
|
def add_torrent_url(self, url: str, save_path: Optional[str] = None,
|
|
category: Optional[str] = None, paused: bool = False) -> bool:
|
|
"""Ajoute un torrent via URL (magnet ou .torrent)."""
|
|
if not self._ensure_connected():
|
|
return False
|
|
|
|
# Si c'est un magnet, l'envoyer directement
|
|
if url.startswith('magnet:'):
|
|
arguments = {"filename": url}
|
|
|
|
if save_path:
|
|
arguments["download-dir"] = save_path
|
|
if paused:
|
|
arguments["paused"] = True
|
|
|
|
result = self._rpc_call("torrent-add", arguments)
|
|
|
|
if result:
|
|
if "torrent-added" in result:
|
|
logger.info(f"✅ Torrent ajouté: {result['torrent-added'].get('name', url[:50])}")
|
|
return True
|
|
elif "torrent-duplicate" in result:
|
|
logger.warning(f"⚠️ Torrent déjà présent: {result['torrent-duplicate'].get('name', url[:50])}")
|
|
return True
|
|
|
|
return False
|
|
|
|
# Si c'est une URL .torrent, télécharger le fichier et l'envoyer en base64
|
|
try:
|
|
logger.info(f"📥 Téléchargement du fichier torrent: {url[:80]}...")
|
|
|
|
# Télécharger le fichier torrent (sans suivre les redirections automatiquement)
|
|
response = self._session.get(url, timeout=30, allow_redirects=False)
|
|
|
|
# Gérer les redirections manuellement pour détecter les magnets
|
|
redirect_count = 0
|
|
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
|
redirect_url = response.headers.get('Location', '')
|
|
|
|
# Si la redirection pointe vers un magnet, l'utiliser directement
|
|
if redirect_url.startswith('magnet:'):
|
|
logger.info(f"🔗 Redirection vers magnet détectée")
|
|
return self.add_torrent_url(redirect_url, save_path, category, paused)
|
|
|
|
# Sinon suivre la redirection
|
|
response = self._session.get(redirect_url, timeout=30, allow_redirects=False)
|
|
redirect_count += 1
|
|
|
|
if response.status_code != 200:
|
|
logger.error(f"❌ Impossible de télécharger le torrent: HTTP {response.status_code}")
|
|
return False
|
|
|
|
content = response.content
|
|
content_type = response.headers.get('Content-Type', '')
|
|
|
|
# Vérifier le Content-Type
|
|
if 'text/html' in content_type:
|
|
logger.error(f"❌ L'URL pointe vers une page web, pas un fichier torrent")
|
|
return False
|
|
|
|
# Vérifier que c'est bien un fichier torrent (commence par 'd' pour dictionnaire bencode)
|
|
if not content.startswith(b'd'):
|
|
# Peut-être que le contenu est un magnet en texte?
|
|
try:
|
|
text_content = content.decode('utf-8').strip()
|
|
if text_content.startswith('magnet:'):
|
|
logger.info(f"🔗 Contenu magnet détecté")
|
|
return self.add_torrent_url(text_content, save_path, category, paused)
|
|
except:
|
|
pass
|
|
|
|
logger.error("❌ Le fichier téléchargé n'est pas un torrent valide (format bencode)")
|
|
return False
|
|
|
|
# Envoyer en base64
|
|
metainfo = base64.b64encode(content).decode('utf-8')
|
|
|
|
arguments = {"metainfo": metainfo}
|
|
|
|
if save_path:
|
|
arguments["download-dir"] = save_path
|
|
if paused:
|
|
arguments["paused"] = True
|
|
|
|
result = self._rpc_call("torrent-add", arguments)
|
|
|
|
if result:
|
|
if "torrent-added" in result:
|
|
logger.info(f"✅ Torrent ajouté: {result['torrent-added'].get('name', 'unknown')}")
|
|
return True
|
|
elif "torrent-duplicate" in result:
|
|
logger.warning(f"⚠️ Torrent déjà présent: {result['torrent-duplicate'].get('name', 'unknown')}")
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Erreur téléchargement torrent: {e}")
|
|
return False
|
|
|
|
def add_torrent_file(self, file_content: bytes, filename: str,
|
|
save_path: Optional[str] = None,
|
|
category: Optional[str] = None, paused: bool = False) -> bool:
|
|
"""Ajoute un torrent via fichier."""
|
|
if not self._ensure_connected():
|
|
return False
|
|
|
|
# Encoder le fichier en base64
|
|
metainfo = base64.b64encode(file_content).decode('utf-8')
|
|
|
|
arguments = {"metainfo": metainfo}
|
|
|
|
if save_path:
|
|
arguments["download-dir"] = save_path
|
|
if paused:
|
|
arguments["paused"] = True
|
|
|
|
result = self._rpc_call("torrent-add", arguments)
|
|
|
|
if result:
|
|
if "torrent-added" in result:
|
|
logger.info(f"✅ Torrent ajouté: {filename}")
|
|
return True
|
|
elif "torrent-duplicate" in result:
|
|
logger.warning(f"⚠️ Torrent déjà présent: {filename}")
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_torrents(self) -> List[TorrentInfo]:
|
|
"""Récupère la liste de tous les torrents."""
|
|
if not self._ensure_connected():
|
|
return []
|
|
|
|
fields = [
|
|
"id", "hashString", "name", "totalSize", "percentDone",
|
|
"status", "rateDownload", "rateUpload", "seeders", "peersGettingFromUs",
|
|
"downloadDir", "error", "errorString"
|
|
]
|
|
|
|
result = self._rpc_call("torrent-get", {"fields": fields})
|
|
|
|
if not result or "torrents" not in result:
|
|
return []
|
|
|
|
torrents = []
|
|
for t in result["torrents"]:
|
|
status = self.STATUS_MAP.get(t.get('status', 0), 'unknown')
|
|
if t.get('error', 0) > 0:
|
|
status = 'error'
|
|
|
|
torrents.append(TorrentInfo(
|
|
hash=t.get('hashString', ''),
|
|
name=t.get('name', ''),
|
|
size=t.get('totalSize', 0),
|
|
progress=t.get('percentDone', 0),
|
|
status=status,
|
|
download_speed=t.get('rateDownload', 0),
|
|
upload_speed=t.get('rateUpload', 0),
|
|
seeds=t.get('seeders', 0) or 0,
|
|
peers=t.get('peersGettingFromUs', 0) or 0,
|
|
save_path=t.get('downloadDir', '')
|
|
))
|
|
|
|
return torrents
|
|
|
|
def get_torrent(self, torrent_hash: str) -> Optional[TorrentInfo]:
|
|
"""Récupère les informations d'un torrent spécifique."""
|
|
if not self._ensure_connected():
|
|
return None
|
|
|
|
fields = [
|
|
"id", "hashString", "name", "totalSize", "percentDone",
|
|
"status", "rateDownload", "rateUpload", "seeders", "peersGettingFromUs",
|
|
"downloadDir", "error", "errorString"
|
|
]
|
|
|
|
# Transmission utilise le hash pour identifier les torrents
|
|
result = self._rpc_call("torrent-get", {
|
|
"ids": [torrent_hash],
|
|
"fields": fields
|
|
})
|
|
|
|
if not result or "torrents" not in result or not result["torrents"]:
|
|
return None
|
|
|
|
t = result["torrents"][0]
|
|
status = self.STATUS_MAP.get(t.get('status', 0), 'unknown')
|
|
if t.get('error', 0) > 0:
|
|
status = 'error'
|
|
|
|
return TorrentInfo(
|
|
hash=t.get('hashString', ''),
|
|
name=t.get('name', ''),
|
|
size=t.get('totalSize', 0),
|
|
progress=t.get('percentDone', 0),
|
|
status=status,
|
|
download_speed=t.get('rateDownload', 0),
|
|
upload_speed=t.get('rateUpload', 0),
|
|
seeds=t.get('seeders', 0) or 0,
|
|
peers=t.get('peersGettingFromUs', 0) or 0,
|
|
save_path=t.get('downloadDir', '')
|
|
)
|
|
|
|
def pause_torrent(self, torrent_hash: str) -> bool:
|
|
"""Met un torrent en pause."""
|
|
if not self._ensure_connected():
|
|
return False
|
|
|
|
result = self._rpc_call("torrent-stop", {"ids": [torrent_hash]})
|
|
return result is not None
|
|
|
|
def resume_torrent(self, torrent_hash: str) -> bool:
|
|
"""Reprend un torrent en pause."""
|
|
if not self._ensure_connected():
|
|
return False
|
|
|
|
result = self._rpc_call("torrent-start", {"ids": [torrent_hash]})
|
|
return result is not None
|
|
|
|
def delete_torrent(self, torrent_hash: str, delete_files: bool = False) -> bool:
|
|
"""Supprime un torrent."""
|
|
if not self._ensure_connected():
|
|
return False
|
|
|
|
result = self._rpc_call("torrent-remove", {
|
|
"ids": [torrent_hash],
|
|
"delete-local-data": delete_files
|
|
})
|
|
return result is not None
|
|
|
|
def get_categories(self) -> List[str]:
|
|
"""
|
|
Récupère la liste des catégories.
|
|
Note: Transmission n'a pas de système de catégories natif,
|
|
on retourne une liste vide.
|
|
"""
|
|
return []
|
|
|
|
def get_version(self) -> Optional[str]:
|
|
"""Récupère la version de Transmission."""
|
|
if not self._ensure_connected():
|
|
return None
|
|
|
|
result = self._rpc_call("session-get")
|
|
if result:
|
|
return result.get('version')
|
|
return None
|
|
|
|
def test_connection(self) -> Dict[str, Any]:
|
|
"""Teste la connexion et retourne des infos."""
|
|
result = super().test_connection()
|
|
|
|
if result["success"]:
|
|
version = self.get_version()
|
|
if version:
|
|
result["version"] = version
|
|
|
|
return result
|
|
|
|
|
|
# Pour l'auto-découverte du plugin
|
|
PLUGIN_CLASS = TransmissionPlugin |