Files
Lycostorrent/app/prowlarr_api.py
2026-03-23 20:59:26 +01:00

265 lines
10 KiB
Python

import requests
import logging
from datetime import datetime
from dateutil import parser as date_parser
logger = logging.getLogger(__name__)
class ProwlarrAPI:
"""Classe pour interagir avec l'API Prowlarr"""
def __init__(self, base_url, api_key):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Lycostorrent/2.0',
'X-Api-Key': api_key
})
def get_indexers(self):
"""Récupère la liste des indexers configurés dans Prowlarr"""
try:
url = f"{self.base_url}/api/v1/indexer"
response = self.session.get(url, timeout=10)
response.raise_for_status()
data = response.json()
indexers = []
for indexer in data:
if indexer.get('enable', False):
indexers.append({
'id': str(indexer.get('id')),
'name': indexer.get('name', 'Unknown'),
'type': 'private' if indexer.get('privacy') == 'private' else 'public',
'source': 'prowlarr'
})
logger.info(f"{len(indexers)} indexers récupérés depuis Prowlarr")
return indexers
except requests.exceptions.RequestException as e:
logger.error(f"❌ Erreur connexion Prowlarr: {e}")
return []
except Exception as e:
logger.error(f"❌ Erreur récupération indexers Prowlarr: {e}")
return []
def search(self, query, indexers=None, category=None, max_results=2000):
"""
Effectue une recherche sur Prowlarr.
Note: Prowlarr ne filtre pas bien avec indexerIds/categories via l'API,
donc on fait la recherche globale et on filtre côté client si nécessaire.
"""
try:
# Si query vide, utiliser le fallback
if not query or query.strip() == '':
return self._get_latest(indexers, category, max_results)
url = f"{self.base_url}/api/v1/search"
# Prowlarr fonctionne mieux avec juste query + limit
# Les filtres indexerIds et categories semblent être ignorés ou mal interprétés
params = {
'query': query,
'limit': min(max_results, 100)
}
logger.info(f"🔍 Prowlarr search: query='{query}'")
response = self.session.get(url, params=params, timeout=60)
if response.status_code != 200:
logger.error(f"❌ Prowlarr error {response.status_code}: {response.text[:500]}")
return []
results = response.json()
logger.info(f"📦 Prowlarr: {len(results)} résultats bruts")
# Filtrer par indexer si spécifié
if indexers and len(indexers) > 0:
indexer_ids = [int(idx) if str(idx).isdigit() else idx for idx in indexers]
results = [r for r in results if r.get('indexerId') in indexer_ids]
logger.info(f"📦 Prowlarr après filtre indexers: {len(results)} résultats")
# Formater les résultats
formatted_results = []
for r in results[:max_results]:
try:
formatted = self._format_result(r)
formatted_results.append(formatted)
except Exception as e:
logger.warning(f"⚠️ Erreur formatage: {e}")
return formatted_results
except requests.exceptions.Timeout:
logger.error("⏱️ Timeout Prowlarr")
return []
except requests.exceptions.RequestException as e:
logger.error(f"❌ Erreur connexion Prowlarr: {e}")
return []
except Exception as e:
logger.error(f"❌ Erreur recherche Prowlarr: {e}", exc_info=True)
return []
def _get_latest(self, indexers=None, category=None, max_results=100):
"""
Récupère les dernières releases via l'API Prowlarr.
Essaie plusieurs termes de recherche si nécessaire.
"""
try:
# Termes à essayer dans l'ordre
search_terms = ['', 'a', 'e', 'the'] # Termes très génériques
url = f"{self.base_url}/api/v1/search"
for term in search_terms:
# Construire les paramètres
params = [('limit', min(max_results, 100))]
if term:
params.append(('query', term))
# Ajouter les indexerIds
if indexers and len(indexers) > 0:
for idx in indexers:
try:
params.append(('indexerIds', int(idx)))
except (ValueError, TypeError):
params.append(('indexerIds', idx))
# Ajouter les categories pour filtrer (films, séries, etc.)
if category:
cat_list = str(category).split(',')
for cat in cat_list:
cat_clean = cat.strip()
if cat_clean and cat_clean.isdigit():
params.append(('categories', int(cat_clean)))
logger.info(f"🔍 Prowlarr latest: term='{term}', indexerIds={indexers}, categories={category}")
response = self.session.get(url, params=params, timeout=60)
logger.debug(f"📡 Prowlarr URL: {response.url}")
if response.status_code != 200:
logger.warning(f"⚠️ Prowlarr error: {response.status_code}")
continue
results = response.json()
logger.info(f"📦 Prowlarr latest: {len(results)} résultats")
# Si on a des résultats, on arrête
if len(results) > 0:
formatted_results = []
for r in results[:max_results]:
try:
formatted = self._format_result(r)
formatted_results.append(formatted)
except Exception as e:
logger.warning(f"⚠️ Erreur formatage: {e}")
return formatted_results
# Aucun terme n'a fonctionné
logger.warning(f"⚠️ Prowlarr: aucun résultat avec tous les termes essayés")
return []
return formatted_results
except Exception as e:
logger.error(f"❌ Erreur Prowlarr latest: {e}")
return []
def _format_result(self, result):
"""Formate un résultat Prowlarr pour être compatible avec le format Jackett"""
try:
# Parser la date
publish_date = result.get('publishDate', '')
try:
if publish_date:
dt = date_parser.parse(publish_date)
formatted_date = dt.strftime('%Y-%m-%d %H:%M')
else:
formatted_date = 'N/A'
except:
formatted_date = 'N/A'
# Récupérer le nom de l'indexer
indexer = result.get('indexer', 'Unknown')
return {
'Title': result.get('title', 'Sans titre'),
'Tracker': indexer,
'Category': ', '.join(result.get('categories', [{'name': 'N/A'}])[0].get('name', 'N/A') if result.get('categories') else 'N/A'),
'PublishDate': formatted_date,
'PublishDateRaw': publish_date,
'Size': result.get('size', 0),
'SizeFormatted': self._format_size(result.get('size', 0)),
'Seeders': result.get('seeders', 0),
'Peers': result.get('leechers', 0),
'Link': result.get('downloadUrl', ''),
'MagnetUri': result.get('magnetUrl', ''),
'Guid': result.get('guid', ''),
'Details': result.get('infoUrl', ''),
'Source': 'prowlarr'
}
except Exception as e:
logger.warning(f"⚠️ Erreur formatage résultat Prowlarr: {e}")
return result
def _format_size(self, size_bytes):
"""Convertit une taille en bytes vers un format lisible"""
try:
size_bytes = int(size_bytes)
if size_bytes == 0:
return "0 B"
units = ['B', 'KB', 'MB', 'GB', 'TB']
unit_index = 0
size = float(size_bytes)
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
if unit_index >= 2: # MB et plus
return f"{size:.2f} {units[unit_index]}"
else:
return f"{size:.0f} {units[unit_index]}"
except:
return "N/A"
def get_indexer_categories(self, indexer_id):
"""Récupère les catégories disponibles pour un indexer"""
try:
url = f"{self.base_url}/api/v1/indexer/{indexer_id}"
response = self.session.get(url, timeout=10)
response.raise_for_status()
data = response.json()
capabilities = data.get('capabilities', {})
categories = capabilities.get('categories', [])
result = []
for cat in categories:
result.append({
'id': str(cat.get('id')),
'name': cat.get('name', f"Catégorie {cat.get('id')}")
})
# Sous-catégories
for subcat in cat.get('subCategories', []):
result.append({
'id': str(subcat.get('id')),
'name': f"{subcat.get('name', '')}"
})
return result
except Exception as e:
logger.error(f"❌ Erreur récupération catégories Prowlarr: {e}")
return []