Files
Lycostorrent/app/tmdb_api.py
2026-03-23 20:59:26 +01:00

371 lines
14 KiB
Python

import requests
import logging
import re
import json
import os
logger = logging.getLogger(__name__)
# Chemin vers le fichier de configuration des tags
PARSING_TAGS_PATH = '/app/config/parsing_tags.json'
# Tags par défaut (exportable pour reset)
DEFAULT_PARSING_TAGS = [
# Langues (non-ambigus)
"MULTi", "MULTI", "VOSTFR", "VOST", "VFF", "VFQ", "VFI",
"FRENCH", "TRUEFRENCH", "SUBFRENCH",
# Résolutions
"1080p", "720p", "480p", "2160p", "4K", "UHD",
# Sources
"WEB", "WEBRIP", "WEBDL", "WEB-DL", "HDTV", "BLURAY", "BDRIP", "BRRIP", "DVDRIP", "HDRip", "REMUX",
# Codecs
"x264", "x265", "HEVC", "H264", "H265", "AV1",
# Audio/Video
"HDR", "HDR10", "DV", "DOLBY", "ATMOS", "DTS", "AC3", "AAC", "FLAC", "TrueHD",
# Autres (non-ambigus)
"PROPER", "REPACK"
]
def _load_parsing_tags():
"""Charge les tags de parsing depuis le fichier JSON"""
try:
if os.path.exists(PARSING_TAGS_PATH):
with open(PARSING_TAGS_PATH, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get('technical_tags', DEFAULT_PARSING_TAGS)
except Exception as e:
logger.warning(f"Impossible de charger parsing_tags.json: {e}")
return DEFAULT_PARSING_TAGS.copy()
def _save_parsing_tags(tags):
"""Sauvegarde les tags de parsing dans le fichier JSON"""
try:
os.makedirs(os.path.dirname(PARSING_TAGS_PATH), exist_ok=True)
with open(PARSING_TAGS_PATH, 'w', encoding='utf-8') as f:
json.dump({'technical_tags': tags}, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
logger.error(f"Erreur sauvegarde parsing_tags.json: {e}")
return False
class TMDbAPI:
"""Classe pour interagir avec l'API TMDb (The Movie Database)"""
def __init__(self, api_key=None):
self.api_key = api_key
self.base_url = "https://api.themoviedb.org/3"
self.image_base_url = "https://image.tmdb.org/t/p/w500"
self.session = requests.Session()
def search_movie(self, title, year=None):
"""Recherche un film sur TMDb"""
try:
clean_title = self._clean_title(title)
params = {
'api_key': self.api_key,
'query': clean_title,
'language': 'fr-FR'
}
if year:
params['year'] = year
response = self.session.get(
f"{self.base_url}/search/movie",
params=params,
timeout=10
)
response.raise_for_status()
data = response.json()
if data.get('results'):
return self._format_movie(data['results'][0])
# Réessai sans année
if year:
params.pop('year')
response = self.session.get(
f"{self.base_url}/search/movie",
params=params,
timeout=10
)
response.raise_for_status()
data = response.json()
if data.get('results'):
return self._format_movie(data['results'][0])
return None
except Exception as e:
logger.error(f"Erreur recherche film TMDb: {e}")
return None
def search_tv(self, title, year=None):
"""Recherche une série sur TMDb"""
try:
clean_title = self._clean_title(title)
params = {
'api_key': self.api_key,
'query': clean_title,
'language': 'fr-FR'
}
if year:
params['first_air_date_year'] = year
response = self.session.get(
f"{self.base_url}/search/tv",
params=params,
timeout=10
)
response.raise_for_status()
data = response.json()
if data.get('results'):
return self._format_tv(data['results'][0])
# Réessai sans année
if year:
params.pop('first_air_date_year')
response = self.session.get(
f"{self.base_url}/search/tv",
params=params,
timeout=10
)
response.raise_for_status()
data = response.json()
if data.get('results'):
return self._format_tv(data['results'][0])
return None
except Exception as e:
logger.error(f"Erreur recherche série TMDb: {e}")
return None
def get_movie_videos(self, movie_id):
"""Récupère la bande-annonce d'un film"""
try:
# Essai en français d'abord
for lang in ['fr-FR', 'en-US']:
response = self.session.get(
f"{self.base_url}/movie/{movie_id}/videos",
params={'api_key': self.api_key, 'language': lang},
timeout=10
)
response.raise_for_status()
data = response.json()
for video in data.get('results', []):
if video.get('type') == 'Trailer' and video.get('site') == 'YouTube':
return f"https://www.youtube.com/watch?v={video['key']}"
return None
except Exception as e:
logger.error(f"Erreur récupération vidéos: {e}")
return None
def get_tv_videos(self, tv_id):
"""Récupère la bande-annonce d'une série"""
try:
for lang in ['fr-FR', 'en-US']:
response = self.session.get(
f"{self.base_url}/tv/{tv_id}/videos",
params={'api_key': self.api_key, 'language': lang},
timeout=10
)
response.raise_for_status()
data = response.json()
for video in data.get('results', []):
if video.get('type') == 'Trailer' and video.get('site') == 'YouTube':
return f"https://www.youtube.com/watch?v={video['key']}"
return None
except Exception as e:
logger.error(f"Erreur récupération vidéos série: {e}")
return None
def _format_movie(self, movie):
"""Formate les données d'un film"""
poster_path = movie.get('poster_path')
backdrop_path = movie.get('backdrop_path')
return {
'tmdb_id': movie.get('id'),
'title': movie.get('title'),
'original_title': movie.get('original_title'),
'overview': movie.get('overview') or 'Synopsis non disponible',
'release_date': movie.get('release_date'),
'year': movie.get('release_date', '')[:4] if movie.get('release_date') else None,
'poster_url': f"{self.image_base_url}{poster_path}" if poster_path else None,
'backdrop_url': f"{self.image_base_url}{backdrop_path}" if backdrop_path else None,
'vote_average': movie.get('vote_average'),
'vote_count': movie.get('vote_count'),
'popularity': movie.get('popularity'),
'type': 'movie'
}
def _format_tv(self, tv):
"""Formate les données d'une série"""
poster_path = tv.get('poster_path')
backdrop_path = tv.get('backdrop_path')
return {
'tmdb_id': tv.get('id'),
'title': tv.get('name'),
'original_title': tv.get('original_name'),
'overview': tv.get('overview') or 'Synopsis non disponible',
'first_air_date': tv.get('first_air_date'),
'year': tv.get('first_air_date', '')[:4] if tv.get('first_air_date') else None,
'poster_url': f"{self.image_base_url}{poster_path}" if poster_path else None,
'backdrop_url': f"{self.image_base_url}{backdrop_path}" if backdrop_path else None,
'vote_average': tv.get('vote_average'),
'vote_count': tv.get('vote_count'),
'popularity': tv.get('popularity'),
'type': 'tv'
}
def _clean_title(self, title):
"""Nettoie le titre pour la recherche TMDb - Version améliorée"""
original = title
# Charger les tags depuis le fichier de configuration
technical_tags = _load_parsing_tags()
# ============================================================
# ÉTAPE 1: Pré-nettoyage
# ============================================================
# Supprimer les tags entre crochets au début et à la fin
# [Team Arcedo] Title... ou Title...-[Shinrei]
title = re.sub(r'^\s*\[[^\]]*\]\s*', '', title) # Début
title = re.sub(r'\s*-?\[[^\]]*\]\s*$', '', title) # Fin
title = re.sub(r'\s*\[[^\]]*\]\s*', ' ', title) # Milieu (remplacer par espace)
# Remplacer points et underscores par espaces
title = title.replace('.', ' ').replace('_', ' ')
# ============================================================
# ÉTAPE 2: Gestion des alias (AKA)
# ============================================================
# "Napoleon vu par Abel Gance AKA Napoleon 1927" → garder avant AKA
if ' AKA ' in title.upper():
parts = re.split(r'\s+AKA\s+', title, flags=re.IGNORECASE)
title = parts[0].strip()
# ============================================================
# ÉTAPE 3: Trouver le point de coupure
# ============================================================
# Priorité 1: Année (19XX ou 20XX)
year_match = re.search(r'\b(19\d{2}|20\d{2})\b', title)
# Priorité 2: Format série S01E01, S01EP01, S01, E1154, EP01
serie_match = re.search(r'\b[Ss](\d{1,2})(?:[Ee][Pp]?(\d{1,4}))?\b|\b[Ee][Pp]?(\d{1,4})\b', title)
# Priorité 3: Tags techniques depuis la config
# Construire le pattern regex à partir des tags
escaped_tags = [re.escape(tag) for tag in technical_tags]
tech_pattern = r'\b(' + '|'.join(escaped_tags) + r')\b'
tech_match = re.search(tech_pattern, title, re.IGNORECASE)
# Déterminer le point de coupure (le plus tôt dans la chaîne)
cut_positions = []
if year_match:
cut_positions.append(year_match.start())
if serie_match:
cut_positions.append(serie_match.start())
if tech_match:
cut_positions.append(tech_match.start())
if cut_positions:
cut_pos = min(cut_positions)
title = title[:cut_pos].strip()
# ============================================================
# ÉTAPE 4: Nettoyage final
# ============================================================
# Supprimer DC (Director's Cut) en fin de titre
title = re.sub(r'\s+DC\s*$', '', title, flags=re.IGNORECASE)
# Supprimer les tirets de fin (souvent avant le groupe de release)
title = re.sub(r'\s*-\s*$', '', title)
# Supprimer les espaces multiples
title = re.sub(r'\s+', ' ', title).strip()
# Supprimer les mots orphelins courants en fin
title = re.sub(r'\s+(The|A|An|Le|La|Les|Un|Une|Des)$', '', title, flags=re.IGNORECASE)
# ============================================================
# ÉTAPE 5: Fallback si titre trop court
# ============================================================
if len(title) < 2:
# Reprendre l'original et faire extraction basique
title = original
title = re.sub(r'^\s*\[[^\]]*\]\s*', '', title)
title = title.replace('.', ' ').replace('_', ' ')
# Prendre les premiers mots avant un pattern technique
m = re.match(r'^([\w\s]+?)(?:\s+(?:S\d|E\d|\d{4}|iNTEGRALE|MULTi|VOSTFR|1080p|720p))', title, re.IGNORECASE)
if m:
title = m.group(1).strip()
else:
# Prendre les 3-4 premiers mots
words = title.split()[:4]
title = ' '.join(words)
logger.debug(f"Titre nettoyé: '{title}' (depuis: '{original[:80]}')")
return title
def enrich_torrent(self, torrent_title, category=None):
"""Enrichit un torrent avec les données TMDb"""
try:
# Détecter le type si non spécifié
if not category:
patterns = [r'S\d{2}E\d{2}', r'S\d{2}\s', r'saison\s*\d+', r'season\s*\d+']
for p in patterns:
if re.search(p, torrent_title, re.IGNORECASE):
category = 'tv'
break
if not category:
category = 'movie'
# Extraire l'année
m = re.search(r'\b(19\d{2}|20\d{2})\b', torrent_title)
year = int(m.group(0)) if m else None
clean = self._clean_title(torrent_title)
logger.info(f"🎬 Recherche TMDb: '{clean}' (type: {category}, année: {year})")
if category == 'movie':
data = self.search_movie(clean, year)
if data:
data['trailer_url'] = self.get_movie_videos(data['tmdb_id'])
logger.info(f"✅ Film trouvé: {data['title']}")
return data
else:
data = self.search_tv(clean, year)
if data:
data['trailer_url'] = self.get_tv_videos(data['tmdb_id'])
logger.info(f"✅ Série trouvée: {data['title']}")
return data
return None
except Exception as e:
logger.error(f"Erreur enrichissement TMDb: {e}")
return None