Files
2026-03-23 20:59:26 +01:00

427 lines
16 KiB
Python

"""
Plugin Transmission pour Lycostorrent.
Utilise l'API RPC de Transmission.
"""
import requests
import json
import base64
import logging
from typing import Optional, List, Dict, Any
from .base import TorrentClientPlugin, TorrentClientConfig, TorrentInfo
logger = logging.getLogger(__name__)
class TransmissionPlugin(TorrentClientPlugin):
"""Plugin pour Transmission via son API RPC."""
PLUGIN_NAME = "Transmission"
PLUGIN_DESCRIPTION = "Client torrent Transmission (API RPC)"
PLUGIN_VERSION = "1.2.0"
PLUGIN_AUTHOR = "Lycostorrent"
# Transmission supporte les fichiers .torrent (avec téléchargement préalable)
SUPPORTS_TORRENT_FILES = True
# Mapping des statuts Transmission vers statuts génériques
# 0: stopped, 1: check pending, 2: checking, 3: download pending
# 4: downloading, 5: seed pending, 6: seeding
STATUS_MAP = {
0: 'paused',
1: 'checking',
2: 'checking',
3: 'queued',
4: 'downloading',
5: 'queued',
6: 'seeding'
}
def __init__(self, config: TorrentClientConfig):
super().__init__(config)
self._session = requests.Session()
self._session_id = None
# Support HTTP Basic Auth (pour les seedbox avec reverse proxy)
if config.username and config.password:
self._session.auth = (config.username, config.password)
@property
def rpc_url(self) -> str:
return f"{self.config.base_url}/transmission/rpc"
def _get_session_id(self) -> bool:
"""Récupère le X-Transmission-Session-Id requis par l'API."""
try:
response = self._session.post(self.rpc_url, timeout=10)
# Transmission retourne 409 avec le session ID dans les headers
if response.status_code == 409:
self._session_id = response.headers.get('X-Transmission-Session-Id')
if self._session_id:
self._session.headers['X-Transmission-Session-Id'] = self._session_id
return True
# Si on a déjà un session ID valide
if response.status_code == 200:
return True
return False
except Exception as e:
logger.error(f"❌ Transmission: Erreur récupération session ID: {e}")
return False
def _rpc_call(self, method: str, arguments: Dict = None) -> Optional[Dict]:
"""Effectue un appel RPC à Transmission."""
if not self._session_id:
if not self._get_session_id():
return None
payload = {"method": method}
if arguments:
payload["arguments"] = arguments
try:
response = self._session.post(
self.rpc_url,
json=payload,
timeout=30
)
# Si session expirée, renouveler et réessayer
if response.status_code == 409:
self._session_id = response.headers.get('X-Transmission-Session-Id')
self._session.headers['X-Transmission-Session-Id'] = self._session_id
response = self._session.post(
self.rpc_url,
json=payload,
timeout=30
)
if response.status_code == 200:
data = response.json()
if data.get('result') == 'success':
return data.get('arguments', {})
else:
logger.error(f"❌ Transmission RPC error: {data.get('result')}")
return None
elif response.status_code == 401:
logger.error("❌ Transmission: Authentification requise")
return None
else:
logger.error(f"❌ Transmission: Erreur HTTP {response.status_code}")
return None
except Exception as e:
logger.error(f"❌ Transmission RPC: {e}")
return None
def connect(self) -> bool:
"""Connexion à Transmission via l'API RPC."""
try:
# Récupérer le session ID
if not self._get_session_id():
logger.error(f"❌ Transmission: Impossible de se connecter à {self.config.base_url}")
return False
# Tester avec un appel simple
result = self._rpc_call("session-get")
if result:
self._connected = True
version = result.get('version', 'unknown')
logger.info(f"✅ Connecté à Transmission {version}: {self.config.host}")
return True
else:
return False
except requests.exceptions.ConnectionError:
logger.error(f"❌ Transmission: Impossible de se connecter à {self.config.base_url}")
return False
except Exception as e:
logger.error(f"❌ Transmission: {e}")
return False
def disconnect(self) -> None:
"""Ferme la connexion avec Transmission."""
self._connected = False
self._session_id = None
def is_connected(self) -> bool:
"""Vérifie si la connexion est active."""
if not self._connected or not self._session_id:
return False
result = self._rpc_call("session-get")
return result is not None
def _ensure_connected(self) -> bool:
"""S'assure que la connexion est active, reconnecte si nécessaire."""
if not self._connected:
return self.connect()
return True
def add_torrent_url(self, url: str, save_path: Optional[str] = None,
category: Optional[str] = None, paused: bool = False) -> bool:
"""Ajoute un torrent via URL (magnet ou .torrent)."""
if not self._ensure_connected():
return False
# Si c'est un magnet, l'envoyer directement
if url.startswith('magnet:'):
arguments = {"filename": url}
if save_path:
arguments["download-dir"] = save_path
if paused:
arguments["paused"] = True
result = self._rpc_call("torrent-add", arguments)
if result:
if "torrent-added" in result:
logger.info(f"✅ Torrent ajouté: {result['torrent-added'].get('name', url[:50])}")
return True
elif "torrent-duplicate" in result:
logger.warning(f"⚠️ Torrent déjà présent: {result['torrent-duplicate'].get('name', url[:50])}")
return True
return False
# Si c'est une URL .torrent, télécharger le fichier et l'envoyer en base64
try:
logger.info(f"📥 Téléchargement du fichier torrent: {url[:80]}...")
# Télécharger le fichier torrent (sans suivre les redirections automatiquement)
response = self._session.get(url, timeout=30, allow_redirects=False)
# Gérer les redirections manuellement pour détecter les magnets
redirect_count = 0
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
redirect_url = response.headers.get('Location', '')
# Si la redirection pointe vers un magnet, l'utiliser directement
if redirect_url.startswith('magnet:'):
logger.info(f"🔗 Redirection vers magnet détectée")
return self.add_torrent_url(redirect_url, save_path, category, paused)
# Sinon suivre la redirection
response = self._session.get(redirect_url, timeout=30, allow_redirects=False)
redirect_count += 1
if response.status_code != 200:
logger.error(f"❌ Impossible de télécharger le torrent: HTTP {response.status_code}")
return False
content = response.content
content_type = response.headers.get('Content-Type', '')
# Vérifier le Content-Type
if 'text/html' in content_type:
logger.error(f"❌ L'URL pointe vers une page web, pas un fichier torrent")
return False
# Vérifier que c'est bien un fichier torrent (commence par 'd' pour dictionnaire bencode)
if not content.startswith(b'd'):
# Peut-être que le contenu est un magnet en texte?
try:
text_content = content.decode('utf-8').strip()
if text_content.startswith('magnet:'):
logger.info(f"🔗 Contenu magnet détecté")
return self.add_torrent_url(text_content, save_path, category, paused)
except:
pass
logger.error("❌ Le fichier téléchargé n'est pas un torrent valide (format bencode)")
return False
# Envoyer en base64
metainfo = base64.b64encode(content).decode('utf-8')
arguments = {"metainfo": metainfo}
if save_path:
arguments["download-dir"] = save_path
if paused:
arguments["paused"] = True
result = self._rpc_call("torrent-add", arguments)
if result:
if "torrent-added" in result:
logger.info(f"✅ Torrent ajouté: {result['torrent-added'].get('name', 'unknown')}")
return True
elif "torrent-duplicate" in result:
logger.warning(f"⚠️ Torrent déjà présent: {result['torrent-duplicate'].get('name', 'unknown')}")
return True
return False
except Exception as e:
logger.error(f"❌ Erreur téléchargement torrent: {e}")
return False
def add_torrent_file(self, file_content: bytes, filename: str,
save_path: Optional[str] = None,
category: Optional[str] = None, paused: bool = False) -> bool:
"""Ajoute un torrent via fichier."""
if not self._ensure_connected():
return False
# Encoder le fichier en base64
metainfo = base64.b64encode(file_content).decode('utf-8')
arguments = {"metainfo": metainfo}
if save_path:
arguments["download-dir"] = save_path
if paused:
arguments["paused"] = True
result = self._rpc_call("torrent-add", arguments)
if result:
if "torrent-added" in result:
logger.info(f"✅ Torrent ajouté: {filename}")
return True
elif "torrent-duplicate" in result:
logger.warning(f"⚠️ Torrent déjà présent: {filename}")
return True
return False
def get_torrents(self) -> List[TorrentInfo]:
"""Récupère la liste de tous les torrents."""
if not self._ensure_connected():
return []
fields = [
"id", "hashString", "name", "totalSize", "percentDone",
"status", "rateDownload", "rateUpload", "seeders", "peersGettingFromUs",
"downloadDir", "error", "errorString"
]
result = self._rpc_call("torrent-get", {"fields": fields})
if not result or "torrents" not in result:
return []
torrents = []
for t in result["torrents"]:
status = self.STATUS_MAP.get(t.get('status', 0), 'unknown')
if t.get('error', 0) > 0:
status = 'error'
torrents.append(TorrentInfo(
hash=t.get('hashString', ''),
name=t.get('name', ''),
size=t.get('totalSize', 0),
progress=t.get('percentDone', 0),
status=status,
download_speed=t.get('rateDownload', 0),
upload_speed=t.get('rateUpload', 0),
seeds=t.get('seeders', 0) or 0,
peers=t.get('peersGettingFromUs', 0) or 0,
save_path=t.get('downloadDir', '')
))
return torrents
def get_torrent(self, torrent_hash: str) -> Optional[TorrentInfo]:
"""Récupère les informations d'un torrent spécifique."""
if not self._ensure_connected():
return None
fields = [
"id", "hashString", "name", "totalSize", "percentDone",
"status", "rateDownload", "rateUpload", "seeders", "peersGettingFromUs",
"downloadDir", "error", "errorString"
]
# Transmission utilise le hash pour identifier les torrents
result = self._rpc_call("torrent-get", {
"ids": [torrent_hash],
"fields": fields
})
if not result or "torrents" not in result or not result["torrents"]:
return None
t = result["torrents"][0]
status = self.STATUS_MAP.get(t.get('status', 0), 'unknown')
if t.get('error', 0) > 0:
status = 'error'
return TorrentInfo(
hash=t.get('hashString', ''),
name=t.get('name', ''),
size=t.get('totalSize', 0),
progress=t.get('percentDone', 0),
status=status,
download_speed=t.get('rateDownload', 0),
upload_speed=t.get('rateUpload', 0),
seeds=t.get('seeders', 0) or 0,
peers=t.get('peersGettingFromUs', 0) or 0,
save_path=t.get('downloadDir', '')
)
def pause_torrent(self, torrent_hash: str) -> bool:
"""Met un torrent en pause."""
if not self._ensure_connected():
return False
result = self._rpc_call("torrent-stop", {"ids": [torrent_hash]})
return result is not None
def resume_torrent(self, torrent_hash: str) -> bool:
"""Reprend un torrent en pause."""
if not self._ensure_connected():
return False
result = self._rpc_call("torrent-start", {"ids": [torrent_hash]})
return result is not None
def delete_torrent(self, torrent_hash: str, delete_files: bool = False) -> bool:
"""Supprime un torrent."""
if not self._ensure_connected():
return False
result = self._rpc_call("torrent-remove", {
"ids": [torrent_hash],
"delete-local-data": delete_files
})
return result is not None
def get_categories(self) -> List[str]:
"""
Récupère la liste des catégories.
Note: Transmission n'a pas de système de catégories natif,
on retourne une liste vide.
"""
return []
def get_version(self) -> Optional[str]:
"""Récupère la version de Transmission."""
if not self._ensure_connected():
return None
result = self._rpc_call("session-get")
if result:
return result.get('version')
return None
def test_connection(self) -> Dict[str, Any]:
"""Teste la connexion et retourne des infos."""
result = super().test_connection()
if result["success"]:
version = self.get_version()
if version:
result["version"] = version
return result
# Pour l'auto-découverte du plugin
PLUGIN_CLASS = TransmissionPlugin