import requests import logging from datetime import datetime from dateutil import parser as date_parser logger = logging.getLogger(__name__) class ProwlarrAPI: """Classe pour interagir avec l'API Prowlarr""" def __init__(self, base_url, api_key): self.base_url = base_url.rstrip('/') self.api_key = api_key self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Lycostorrent/2.0', 'X-Api-Key': api_key }) def get_indexers(self): """Récupère la liste des indexers configurés dans Prowlarr""" try: url = f"{self.base_url}/api/v1/indexer" response = self.session.get(url, timeout=10) response.raise_for_status() data = response.json() indexers = [] for indexer in data: if indexer.get('enable', False): indexers.append({ 'id': str(indexer.get('id')), 'name': indexer.get('name', 'Unknown'), 'type': 'private' if indexer.get('privacy') == 'private' else 'public', 'source': 'prowlarr' }) logger.info(f"✅ {len(indexers)} indexers récupérés depuis Prowlarr") return indexers except requests.exceptions.RequestException as e: logger.error(f"❌ Erreur connexion Prowlarr: {e}") return [] except Exception as e: logger.error(f"❌ Erreur récupération indexers Prowlarr: {e}") return [] def search(self, query, indexers=None, category=None, max_results=2000): """ Effectue une recherche sur Prowlarr. Note: Prowlarr ne filtre pas bien avec indexerIds/categories via l'API, donc on fait la recherche globale et on filtre côté client si nécessaire. """ try: # Si query vide, utiliser le fallback if not query or query.strip() == '': return self._get_latest(indexers, category, max_results) url = f"{self.base_url}/api/v1/search" # Prowlarr fonctionne mieux avec juste query + limit # Les filtres indexerIds et categories semblent être ignorés ou mal interprétés params = { 'query': query, 'limit': min(max_results, 100) } logger.info(f"🔍 Prowlarr search: query='{query}'") response = self.session.get(url, params=params, timeout=60) if response.status_code != 200: logger.error(f"❌ Prowlarr error {response.status_code}: {response.text[:500]}") return [] results = response.json() logger.info(f"📦 Prowlarr: {len(results)} résultats bruts") # Filtrer par indexer si spécifié if indexers and len(indexers) > 0: indexer_ids = [int(idx) if str(idx).isdigit() else idx for idx in indexers] results = [r for r in results if r.get('indexerId') in indexer_ids] logger.info(f"📦 Prowlarr après filtre indexers: {len(results)} résultats") # Formater les résultats formatted_results = [] for r in results[:max_results]: try: formatted = self._format_result(r) formatted_results.append(formatted) except Exception as e: logger.warning(f"⚠️ Erreur formatage: {e}") return formatted_results except requests.exceptions.Timeout: logger.error("⏱️ Timeout Prowlarr") return [] except requests.exceptions.RequestException as e: logger.error(f"❌ Erreur connexion Prowlarr: {e}") return [] except Exception as e: logger.error(f"❌ Erreur recherche Prowlarr: {e}", exc_info=True) return [] def _get_latest(self, indexers=None, category=None, max_results=100): """ Récupère les dernières releases via l'API Prowlarr. Essaie plusieurs termes de recherche si nécessaire. """ try: # Termes à essayer dans l'ordre search_terms = ['', 'a', 'e', 'the'] # Termes très génériques url = f"{self.base_url}/api/v1/search" for term in search_terms: # Construire les paramètres params = [('limit', min(max_results, 100))] if term: params.append(('query', term)) # Ajouter les indexerIds if indexers and len(indexers) > 0: for idx in indexers: try: params.append(('indexerIds', int(idx))) except (ValueError, TypeError): params.append(('indexerIds', idx)) # Ajouter les categories pour filtrer (films, séries, etc.) if category: cat_list = str(category).split(',') for cat in cat_list: cat_clean = cat.strip() if cat_clean and cat_clean.isdigit(): params.append(('categories', int(cat_clean))) logger.info(f"🔍 Prowlarr latest: term='{term}', indexerIds={indexers}, categories={category}") response = self.session.get(url, params=params, timeout=60) logger.debug(f"📡 Prowlarr URL: {response.url}") if response.status_code != 200: logger.warning(f"⚠️ Prowlarr error: {response.status_code}") continue results = response.json() logger.info(f"📦 Prowlarr latest: {len(results)} résultats") # Si on a des résultats, on arrête if len(results) > 0: formatted_results = [] for r in results[:max_results]: try: formatted = self._format_result(r) formatted_results.append(formatted) except Exception as e: logger.warning(f"⚠️ Erreur formatage: {e}") return formatted_results # Aucun terme n'a fonctionné logger.warning(f"⚠️ Prowlarr: aucun résultat avec tous les termes essayés") return [] return formatted_results except Exception as e: logger.error(f"❌ Erreur Prowlarr latest: {e}") return [] def _format_result(self, result): """Formate un résultat Prowlarr pour être compatible avec le format Jackett""" try: # Parser la date publish_date = result.get('publishDate', '') try: if publish_date: dt = date_parser.parse(publish_date) formatted_date = dt.strftime('%Y-%m-%d %H:%M') else: formatted_date = 'N/A' except: formatted_date = 'N/A' # Récupérer le nom de l'indexer indexer = result.get('indexer', 'Unknown') return { 'Title': result.get('title', 'Sans titre'), 'Tracker': indexer, 'Category': ', '.join(result.get('categories', [{'name': 'N/A'}])[0].get('name', 'N/A') if result.get('categories') else 'N/A'), 'PublishDate': formatted_date, 'PublishDateRaw': publish_date, 'Size': result.get('size', 0), 'SizeFormatted': self._format_size(result.get('size', 0)), 'Seeders': result.get('seeders', 0), 'Peers': result.get('leechers', 0), 'Link': result.get('downloadUrl', ''), 'MagnetUri': result.get('magnetUrl', ''), 'Guid': result.get('guid', ''), 'Details': result.get('infoUrl', ''), 'Source': 'prowlarr' } except Exception as e: logger.warning(f"⚠️ Erreur formatage résultat Prowlarr: {e}") return result def _format_size(self, size_bytes): """Convertit une taille en bytes vers un format lisible""" try: size_bytes = int(size_bytes) if size_bytes == 0: return "0 B" units = ['B', 'KB', 'MB', 'GB', 'TB'] unit_index = 0 size = float(size_bytes) while size >= 1024 and unit_index < len(units) - 1: size /= 1024 unit_index += 1 if unit_index >= 2: # MB et plus return f"{size:.2f} {units[unit_index]}" else: return f"{size:.0f} {units[unit_index]}" except: return "N/A" def get_indexer_categories(self, indexer_id): """Récupère les catégories disponibles pour un indexer""" try: url = f"{self.base_url}/api/v1/indexer/{indexer_id}" response = self.session.get(url, timeout=10) response.raise_for_status() data = response.json() capabilities = data.get('capabilities', {}) categories = capabilities.get('categories', []) result = [] for cat in categories: result.append({ 'id': str(cat.get('id')), 'name': cat.get('name', f"Catégorie {cat.get('id')}") }) # Sous-catégories for subcat in cat.get('subCategories', []): result.append({ 'id': str(subcat.get('id')), 'name': f" └ {subcat.get('name', '')}" }) return result except Exception as e: logger.error(f"❌ Erreur récupération catégories Prowlarr: {e}") return []