371 lines
14 KiB
Python
371 lines
14 KiB
Python
import requests
|
|
import logging
|
|
import re
|
|
import json
|
|
import os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Chemin vers le fichier de configuration des tags
|
|
PARSING_TAGS_PATH = '/app/config/parsing_tags.json'
|
|
|
|
# Tags par défaut (exportable pour reset)
|
|
DEFAULT_PARSING_TAGS = [
|
|
# Langues (non-ambigus)
|
|
"MULTi", "MULTI", "VOSTFR", "VOST", "VFF", "VFQ", "VFI",
|
|
"FRENCH", "TRUEFRENCH", "SUBFRENCH",
|
|
# Résolutions
|
|
"1080p", "720p", "480p", "2160p", "4K", "UHD",
|
|
# Sources
|
|
"WEB", "WEBRIP", "WEBDL", "WEB-DL", "HDTV", "BLURAY", "BDRIP", "BRRIP", "DVDRIP", "HDRip", "REMUX",
|
|
# Codecs
|
|
"x264", "x265", "HEVC", "H264", "H265", "AV1",
|
|
# Audio/Video
|
|
"HDR", "HDR10", "DV", "DOLBY", "ATMOS", "DTS", "AC3", "AAC", "FLAC", "TrueHD",
|
|
# Autres (non-ambigus)
|
|
"PROPER", "REPACK"
|
|
]
|
|
|
|
def _load_parsing_tags():
|
|
"""Charge les tags de parsing depuis le fichier JSON"""
|
|
try:
|
|
if os.path.exists(PARSING_TAGS_PATH):
|
|
with open(PARSING_TAGS_PATH, 'r', encoding='utf-8') as f:
|
|
config = json.load(f)
|
|
return config.get('technical_tags', DEFAULT_PARSING_TAGS)
|
|
except Exception as e:
|
|
logger.warning(f"Impossible de charger parsing_tags.json: {e}")
|
|
|
|
return DEFAULT_PARSING_TAGS.copy()
|
|
|
|
|
|
def _save_parsing_tags(tags):
|
|
"""Sauvegarde les tags de parsing dans le fichier JSON"""
|
|
try:
|
|
os.makedirs(os.path.dirname(PARSING_TAGS_PATH), exist_ok=True)
|
|
with open(PARSING_TAGS_PATH, 'w', encoding='utf-8') as f:
|
|
json.dump({'technical_tags': tags}, f, indent=2, ensure_ascii=False)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Erreur sauvegarde parsing_tags.json: {e}")
|
|
return False
|
|
|
|
|
|
class TMDbAPI:
|
|
"""Classe pour interagir avec l'API TMDb (The Movie Database)"""
|
|
|
|
def __init__(self, api_key=None):
|
|
self.api_key = api_key
|
|
self.base_url = "https://api.themoviedb.org/3"
|
|
self.image_base_url = "https://image.tmdb.org/t/p/w500"
|
|
self.session = requests.Session()
|
|
|
|
def search_movie(self, title, year=None):
|
|
"""Recherche un film sur TMDb"""
|
|
try:
|
|
clean_title = self._clean_title(title)
|
|
|
|
params = {
|
|
'api_key': self.api_key,
|
|
'query': clean_title,
|
|
'language': 'fr-FR'
|
|
}
|
|
|
|
if year:
|
|
params['year'] = year
|
|
|
|
response = self.session.get(
|
|
f"{self.base_url}/search/movie",
|
|
params=params,
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get('results'):
|
|
return self._format_movie(data['results'][0])
|
|
|
|
# Réessai sans année
|
|
if year:
|
|
params.pop('year')
|
|
response = self.session.get(
|
|
f"{self.base_url}/search/movie",
|
|
params=params,
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get('results'):
|
|
return self._format_movie(data['results'][0])
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur recherche film TMDb: {e}")
|
|
return None
|
|
|
|
def search_tv(self, title, year=None):
|
|
"""Recherche une série sur TMDb"""
|
|
try:
|
|
clean_title = self._clean_title(title)
|
|
|
|
params = {
|
|
'api_key': self.api_key,
|
|
'query': clean_title,
|
|
'language': 'fr-FR'
|
|
}
|
|
|
|
if year:
|
|
params['first_air_date_year'] = year
|
|
|
|
response = self.session.get(
|
|
f"{self.base_url}/search/tv",
|
|
params=params,
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get('results'):
|
|
return self._format_tv(data['results'][0])
|
|
|
|
# Réessai sans année
|
|
if year:
|
|
params.pop('first_air_date_year')
|
|
response = self.session.get(
|
|
f"{self.base_url}/search/tv",
|
|
params=params,
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get('results'):
|
|
return self._format_tv(data['results'][0])
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur recherche série TMDb: {e}")
|
|
return None
|
|
|
|
def get_movie_videos(self, movie_id):
|
|
"""Récupère la bande-annonce d'un film"""
|
|
try:
|
|
# Essai en français d'abord
|
|
for lang in ['fr-FR', 'en-US']:
|
|
response = self.session.get(
|
|
f"{self.base_url}/movie/{movie_id}/videos",
|
|
params={'api_key': self.api_key, 'language': lang},
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
for video in data.get('results', []):
|
|
if video.get('type') == 'Trailer' and video.get('site') == 'YouTube':
|
|
return f"https://www.youtube.com/watch?v={video['key']}"
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur récupération vidéos: {e}")
|
|
return None
|
|
|
|
def get_tv_videos(self, tv_id):
|
|
"""Récupère la bande-annonce d'une série"""
|
|
try:
|
|
for lang in ['fr-FR', 'en-US']:
|
|
response = self.session.get(
|
|
f"{self.base_url}/tv/{tv_id}/videos",
|
|
params={'api_key': self.api_key, 'language': lang},
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
for video in data.get('results', []):
|
|
if video.get('type') == 'Trailer' and video.get('site') == 'YouTube':
|
|
return f"https://www.youtube.com/watch?v={video['key']}"
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur récupération vidéos série: {e}")
|
|
return None
|
|
|
|
def _format_movie(self, movie):
|
|
"""Formate les données d'un film"""
|
|
poster_path = movie.get('poster_path')
|
|
backdrop_path = movie.get('backdrop_path')
|
|
|
|
return {
|
|
'tmdb_id': movie.get('id'),
|
|
'title': movie.get('title'),
|
|
'original_title': movie.get('original_title'),
|
|
'overview': movie.get('overview') or 'Synopsis non disponible',
|
|
'release_date': movie.get('release_date'),
|
|
'year': movie.get('release_date', '')[:4] if movie.get('release_date') else None,
|
|
'poster_url': f"{self.image_base_url}{poster_path}" if poster_path else None,
|
|
'backdrop_url': f"{self.image_base_url}{backdrop_path}" if backdrop_path else None,
|
|
'vote_average': movie.get('vote_average'),
|
|
'vote_count': movie.get('vote_count'),
|
|
'popularity': movie.get('popularity'),
|
|
'type': 'movie'
|
|
}
|
|
|
|
def _format_tv(self, tv):
|
|
"""Formate les données d'une série"""
|
|
poster_path = tv.get('poster_path')
|
|
backdrop_path = tv.get('backdrop_path')
|
|
|
|
return {
|
|
'tmdb_id': tv.get('id'),
|
|
'title': tv.get('name'),
|
|
'original_title': tv.get('original_name'),
|
|
'overview': tv.get('overview') or 'Synopsis non disponible',
|
|
'first_air_date': tv.get('first_air_date'),
|
|
'year': tv.get('first_air_date', '')[:4] if tv.get('first_air_date') else None,
|
|
'poster_url': f"{self.image_base_url}{poster_path}" if poster_path else None,
|
|
'backdrop_url': f"{self.image_base_url}{backdrop_path}" if backdrop_path else None,
|
|
'vote_average': tv.get('vote_average'),
|
|
'vote_count': tv.get('vote_count'),
|
|
'popularity': tv.get('popularity'),
|
|
'type': 'tv'
|
|
}
|
|
|
|
def _clean_title(self, title):
|
|
"""Nettoie le titre pour la recherche TMDb - Version améliorée"""
|
|
original = title
|
|
|
|
# Charger les tags depuis le fichier de configuration
|
|
technical_tags = _load_parsing_tags()
|
|
|
|
# ============================================================
|
|
# ÉTAPE 1: Pré-nettoyage
|
|
# ============================================================
|
|
|
|
# Supprimer les tags entre crochets au début et à la fin
|
|
# [Team Arcedo] Title... ou Title...-[Shinrei]
|
|
title = re.sub(r'^\s*\[[^\]]*\]\s*', '', title) # Début
|
|
title = re.sub(r'\s*-?\[[^\]]*\]\s*$', '', title) # Fin
|
|
title = re.sub(r'\s*\[[^\]]*\]\s*', ' ', title) # Milieu (remplacer par espace)
|
|
|
|
# Remplacer points et underscores par espaces
|
|
title = title.replace('.', ' ').replace('_', ' ')
|
|
|
|
# ============================================================
|
|
# ÉTAPE 2: Gestion des alias (AKA)
|
|
# ============================================================
|
|
|
|
# "Napoleon vu par Abel Gance AKA Napoleon 1927" → garder avant AKA
|
|
if ' AKA ' in title.upper():
|
|
parts = re.split(r'\s+AKA\s+', title, flags=re.IGNORECASE)
|
|
title = parts[0].strip()
|
|
|
|
# ============================================================
|
|
# ÉTAPE 3: Trouver le point de coupure
|
|
# ============================================================
|
|
|
|
# Priorité 1: Année (19XX ou 20XX)
|
|
year_match = re.search(r'\b(19\d{2}|20\d{2})\b', title)
|
|
|
|
# Priorité 2: Format série S01E01, S01EP01, S01, E1154, EP01
|
|
serie_match = re.search(r'\b[Ss](\d{1,2})(?:[Ee][Pp]?(\d{1,4}))?\b|\b[Ee][Pp]?(\d{1,4})\b', title)
|
|
|
|
# Priorité 3: Tags techniques depuis la config
|
|
# Construire le pattern regex à partir des tags
|
|
escaped_tags = [re.escape(tag) for tag in technical_tags]
|
|
tech_pattern = r'\b(' + '|'.join(escaped_tags) + r')\b'
|
|
tech_match = re.search(tech_pattern, title, re.IGNORECASE)
|
|
|
|
# Déterminer le point de coupure (le plus tôt dans la chaîne)
|
|
cut_positions = []
|
|
|
|
if year_match:
|
|
cut_positions.append(year_match.start())
|
|
if serie_match:
|
|
cut_positions.append(serie_match.start())
|
|
if tech_match:
|
|
cut_positions.append(tech_match.start())
|
|
|
|
if cut_positions:
|
|
cut_pos = min(cut_positions)
|
|
title = title[:cut_pos].strip()
|
|
|
|
# ============================================================
|
|
# ÉTAPE 4: Nettoyage final
|
|
# ============================================================
|
|
|
|
# Supprimer DC (Director's Cut) en fin de titre
|
|
title = re.sub(r'\s+DC\s*$', '', title, flags=re.IGNORECASE)
|
|
|
|
# Supprimer les tirets de fin (souvent avant le groupe de release)
|
|
title = re.sub(r'\s*-\s*$', '', title)
|
|
|
|
# Supprimer les espaces multiples
|
|
title = re.sub(r'\s+', ' ', title).strip()
|
|
|
|
# Supprimer les mots orphelins courants en fin
|
|
title = re.sub(r'\s+(The|A|An|Le|La|Les|Un|Une|Des)$', '', title, flags=re.IGNORECASE)
|
|
|
|
# ============================================================
|
|
# ÉTAPE 5: Fallback si titre trop court
|
|
# ============================================================
|
|
|
|
if len(title) < 2:
|
|
# Reprendre l'original et faire extraction basique
|
|
title = original
|
|
title = re.sub(r'^\s*\[[^\]]*\]\s*', '', title)
|
|
title = title.replace('.', ' ').replace('_', ' ')
|
|
# Prendre les premiers mots avant un pattern technique
|
|
m = re.match(r'^([\w\s]+?)(?:\s+(?:S\d|E\d|\d{4}|iNTEGRALE|MULTi|VOSTFR|1080p|720p))', title, re.IGNORECASE)
|
|
if m:
|
|
title = m.group(1).strip()
|
|
else:
|
|
# Prendre les 3-4 premiers mots
|
|
words = title.split()[:4]
|
|
title = ' '.join(words)
|
|
|
|
logger.debug(f"Titre nettoyé: '{title}' (depuis: '{original[:80]}')")
|
|
return title
|
|
|
|
def enrich_torrent(self, torrent_title, category=None):
|
|
"""Enrichit un torrent avec les données TMDb"""
|
|
try:
|
|
# Détecter le type si non spécifié
|
|
if not category:
|
|
patterns = [r'S\d{2}E\d{2}', r'S\d{2}\s', r'saison\s*\d+', r'season\s*\d+']
|
|
for p in patterns:
|
|
if re.search(p, torrent_title, re.IGNORECASE):
|
|
category = 'tv'
|
|
break
|
|
if not category:
|
|
category = 'movie'
|
|
|
|
# Extraire l'année
|
|
m = re.search(r'\b(19\d{2}|20\d{2})\b', torrent_title)
|
|
year = int(m.group(0)) if m else None
|
|
|
|
clean = self._clean_title(torrent_title)
|
|
logger.info(f"🎬 Recherche TMDb: '{clean}' (type: {category}, année: {year})")
|
|
|
|
if category == 'movie':
|
|
data = self.search_movie(clean, year)
|
|
if data:
|
|
data['trailer_url'] = self.get_movie_videos(data['tmdb_id'])
|
|
logger.info(f"✅ Film trouvé: {data['title']}")
|
|
return data
|
|
else:
|
|
data = self.search_tv(clean, year)
|
|
if data:
|
|
data['trailer_url'] = self.get_tv_videos(data['tmdb_id'])
|
|
logger.info(f"✅ Série trouvée: {data['title']}")
|
|
return data
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur enrichissement TMDb: {e}")
|
|
return None |