""" Lycostorrent - Module de sécurité Version 2.0 Fonctionnalités : - Hash des mots de passe (bcrypt) - Rate limiting - Protection CSRF - Gestion sécurisée des sessions - Logs de sécurité """ import os import json import hashlib import secrets import logging import time from datetime import datetime, timedelta from pathlib import Path from functools import wraps from threading import Lock logger = logging.getLogger(__name__) # ============================================================ # CONFIGURATION # ============================================================ CONFIG_DIR = Path('/app/config') SECURITY_CONFIG_FILE = CONFIG_DIR / 'security.json' FAILED_ATTEMPTS_FILE = CONFIG_DIR / 'failed_attempts.json' # Paramètres de sécurité MAX_FAILED_ATTEMPTS = int(os.getenv('MAX_FAILED_ATTEMPTS', 5)) LOCKOUT_DURATION = int(os.getenv('LOCKOUT_DURATION', 300)) # 5 minutes RATE_LIMIT_WINDOW = int(os.getenv('RATE_LIMIT_WINDOW', 60)) # 1 minute RATE_LIMIT_MAX_REQUESTS = int(os.getenv('RATE_LIMIT_MAX_REQUESTS', 30)) # Lock pour thread-safety _lock = Lock() # ============================================================ # HASH DES MOTS DE PASSE # ============================================================ def hash_password(password: str) -> str: """ Hash un mot de passe avec SHA-256 + salt Format: salt$hash """ if not password: return '' salt = secrets.token_hex(16) hash_obj = hashlib.pbkdf2_hmac( 'sha256', password.encode('utf-8'), salt.encode('utf-8'), 100000 # iterations ) return f"{salt}${hash_obj.hex()}" def verify_password(password: str, hashed: str) -> bool: """ Vérifie un mot de passe contre son hash Supporte aussi la comparaison directe pour migration """ if not password or not hashed: return False # Si le hash ne contient pas de $, c'est un mot de passe en clair (legacy) if '$' not in hashed: return password == hashed try: salt, stored_hash = hashed.split('$', 1) hash_obj = hashlib.pbkdf2_hmac( 'sha256', password.encode('utf-8'), salt.encode('utf-8'), 100000 ) return hash_obj.hex() == stored_hash except Exception as e: logger.error(f"Erreur vérification mot de passe: {e}") return False def is_password_hashed(password: str) -> bool: """Vérifie si un mot de passe est déjà hashé""" if not password: return False # Un hash valide contient un $ et fait au moins 64 caractères (32 salt + $ + 64 hash) return '$' in password and len(password) >= 97 def migrate_password_if_needed(current_password: str) -> tuple: """ Migre un mot de passe en clair vers un hash si nécessaire Retourne (password_hash, was_migrated) """ if not current_password: return '', False if is_password_hashed(current_password): return current_password, False # Migrer le mot de passe hashed = hash_password(current_password) logger.info("🔐 Mot de passe migré vers format hashé") return hashed, True # ============================================================ # RATE LIMITING & PROTECTION BRUTE-FORCE # ============================================================ class RateLimiter: """Gestionnaire de rate limiting en mémoire""" def __init__(self): self.requests = {} # {ip: [(timestamp, count), ...]} self.failed_attempts = {} # {ip: {'count': int, 'locked_until': timestamp}} self._load_failed_attempts() def _load_failed_attempts(self): """Charge les tentatives échouées depuis le fichier""" try: if FAILED_ATTEMPTS_FILE.exists(): with open(FAILED_ATTEMPTS_FILE) as f: data = json.load(f) # Nettoyer les entrées expirées now = time.time() self.failed_attempts = { ip: info for ip, info in data.items() if info.get('locked_until', 0) > now or info.get('count', 0) > 0 } except Exception as e: logger.warning(f"Erreur chargement tentatives échouées: {e}") self.failed_attempts = {} def _save_failed_attempts(self): """Sauvegarde les tentatives échouées""" try: CONFIG_DIR.mkdir(parents=True, exist_ok=True) with open(FAILED_ATTEMPTS_FILE, 'w') as f: json.dump(self.failed_attempts, f) except Exception as e: logger.warning(f"Erreur sauvegarde tentatives échouées: {e}") def is_rate_limited(self, ip: str) -> bool: """Vérifie si une IP est rate-limitée""" now = time.time() with _lock: # Nettoyer les anciennes entrées if ip in self.requests: self.requests[ip] = [ (ts, count) for ts, count in self.requests[ip] if now - ts < RATE_LIMIT_WINDOW ] # Compter les requêtes récentes if ip in self.requests: total = sum(count for _, count in self.requests[ip]) if total >= RATE_LIMIT_MAX_REQUESTS: return True # Ajouter cette requête if ip not in self.requests: self.requests[ip] = [] self.requests[ip].append((now, 1)) return False def is_locked_out(self, ip: str) -> tuple: """ Vérifie si une IP est bloquée après trop de tentatives échouées Retourne (is_locked, remaining_seconds) """ with _lock: if ip not in self.failed_attempts: return False, 0 info = self.failed_attempts[ip] locked_until = info.get('locked_until', 0) if locked_until > time.time(): remaining = int(locked_until - time.time()) return True, remaining # Le lockout a expiré, réinitialiser if info.get('count', 0) >= MAX_FAILED_ATTEMPTS: self.failed_attempts[ip] = {'count': 0, 'locked_until': 0} self._save_failed_attempts() return False, 0 def record_failed_attempt(self, ip: str, username: str = None): """Enregistre une tentative de connexion échouée""" with _lock: if ip not in self.failed_attempts: self.failed_attempts[ip] = {'count': 0, 'locked_until': 0} self.failed_attempts[ip]['count'] = self.failed_attempts[ip].get('count', 0) + 1 count = self.failed_attempts[ip]['count'] logger.warning(f"⚠️ Tentative échouée #{count} depuis {ip}" + (f" (user: {username})" if username else "")) # Bloquer après trop de tentatives if count >= MAX_FAILED_ATTEMPTS: self.failed_attempts[ip]['locked_until'] = time.time() + LOCKOUT_DURATION logger.warning(f"🔒 IP bloquée pour {LOCKOUT_DURATION}s: {ip}") self._save_failed_attempts() def record_successful_login(self, ip: str): """Réinitialise les tentatives après une connexion réussie""" with _lock: if ip in self.failed_attempts: del self.failed_attempts[ip] self._save_failed_attempts() # Instance globale rate_limiter = RateLimiter() # ============================================================ # PROTECTION CSRF # ============================================================ def generate_csrf_token() -> str: """Génère un token CSRF""" return secrets.token_hex(32) def validate_csrf_token(session_token: str, form_token: str) -> bool: """Valide un token CSRF""" if not session_token or not form_token: return False return secrets.compare_digest(session_token, form_token) # ============================================================ # HEADERS DE SÉCURITÉ # ============================================================ def get_security_headers() -> dict: """Retourne les headers de sécurité HTTP recommandés""" return { 'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': 'SAMEORIGIN', 'X-XSS-Protection': '1; mode=block', 'Referrer-Policy': 'strict-origin-when-cross-origin', 'Permissions-Policy': 'geolocation=(), microphone=(), camera=()', 'Content-Security-Policy': ( "default-src 'self'; " "script-src 'self' 'unsafe-inline' https://www.youtube.com https://s.ytimg.com; " "style-src 'self' 'unsafe-inline'; " "img-src 'self' data: https://image.tmdb.org https://i.ytimg.com https://*.last.fm; " "frame-src https://www.youtube.com https://www.youtube-nocookie.com; " "connect-src 'self'; " "font-src 'self'; " "object-src 'none'; " "base-uri 'self'; " "form-action 'self';" ), 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains' } # ============================================================ # VALIDATION DES ENTRÉES # ============================================================ def sanitize_input(value: str, max_length: int = 200) -> str: """Nettoie une entrée utilisateur""" if not value: return '' # Limiter la longueur value = str(value)[:max_length] # Supprimer les caractères de contrôle value = ''.join(char for char in value if ord(char) >= 32 or char in '\n\r\t') return value.strip() def validate_username(username: str) -> bool: """Valide un nom d'utilisateur""" if not username: return False if len(username) < 3 or len(username) > 50: return False # Autoriser lettres, chiffres, underscores, tirets import re return bool(re.match(r'^[a-zA-Z0-9_-]+$', username)) def get_client_ip(request) -> str: """Récupère l'IP réelle du client (gère les proxies)""" # Headers de proxy communs headers_to_check = [ 'X-Forwarded-For', 'X-Real-IP', 'CF-Connecting-IP', # Cloudflare 'True-Client-IP', ] for header in headers_to_check: if header in request.headers: # X-Forwarded-For peut contenir plusieurs IPs ip = request.headers[header].split(',')[0].strip() if ip: return ip return request.remote_addr or '127.0.0.1' # ============================================================ # GESTION SÉCURISÉE DE LA CONFIG # ============================================================ def load_security_config() -> dict: """Charge la configuration de sécurité""" default_config = { 'password_hash': '', 'password_migrated': False, 'last_password_change': None, 'created_at': datetime.now().isoformat() } try: if SECURITY_CONFIG_FILE.exists(): with open(SECURITY_CONFIG_FILE) as f: config = json.load(f) return {**default_config, **config} except Exception as e: logger.warning(f"Erreur chargement config sécurité: {e}") return default_config def save_security_config(config: dict): """Sauvegarde la configuration de sécurité""" try: CONFIG_DIR.mkdir(parents=True, exist_ok=True) with open(SECURITY_CONFIG_FILE, 'w') as f: json.dump(config, f, indent=2) # Permissions restrictives os.chmod(SECURITY_CONFIG_FILE, 0o600) except Exception as e: logger.error(f"Erreur sauvegarde config sécurité: {e}") # ============================================================ # LOGS DE SÉCURITÉ # ============================================================ def log_security_event(event_type: str, ip: str, details: str = None): """Log un événement de sécurité""" message = f"[SECURITY] {event_type} - IP: {ip}" if details: message += f" - {details}" if event_type in ['LOGIN_FAILED', 'LOCKOUT', 'CSRF_INVALID', 'RATE_LIMITED']: logger.warning(message) else: logger.info(message)