Files
Lycostorrent/app/security.py
2026-03-23 20:59:26 +01:00

376 lines
12 KiB
Python

"""
Lycostorrent - Module de sécurité
Version 2.0
Fonctionnalités :
- Hash des mots de passe (bcrypt)
- Rate limiting
- Protection CSRF
- Gestion sécurisée des sessions
- Logs de sécurité
"""
import os
import json
import hashlib
import secrets
import logging
import time
from datetime import datetime, timedelta
from pathlib import Path
from functools import wraps
from threading import Lock
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURATION
# ============================================================
CONFIG_DIR = Path('/app/config')
SECURITY_CONFIG_FILE = CONFIG_DIR / 'security.json'
FAILED_ATTEMPTS_FILE = CONFIG_DIR / 'failed_attempts.json'
# Paramètres de sécurité
MAX_FAILED_ATTEMPTS = int(os.getenv('MAX_FAILED_ATTEMPTS', 5))
LOCKOUT_DURATION = int(os.getenv('LOCKOUT_DURATION', 300)) # 5 minutes
RATE_LIMIT_WINDOW = int(os.getenv('RATE_LIMIT_WINDOW', 60)) # 1 minute
RATE_LIMIT_MAX_REQUESTS = int(os.getenv('RATE_LIMIT_MAX_REQUESTS', 30))
# Lock pour thread-safety
_lock = Lock()
# ============================================================
# HASH DES MOTS DE PASSE
# ============================================================
def hash_password(password: str) -> str:
"""
Hash un mot de passe avec SHA-256 + salt
Format: salt$hash
"""
if not password:
return ''
salt = secrets.token_hex(16)
hash_obj = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000 # iterations
)
return f"{salt}${hash_obj.hex()}"
def verify_password(password: str, hashed: str) -> bool:
"""
Vérifie un mot de passe contre son hash
Supporte aussi la comparaison directe pour migration
"""
if not password or not hashed:
return False
# Si le hash ne contient pas de $, c'est un mot de passe en clair (legacy)
if '$' not in hashed:
return password == hashed
try:
salt, stored_hash = hashed.split('$', 1)
hash_obj = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000
)
return hash_obj.hex() == stored_hash
except Exception as e:
logger.error(f"Erreur vérification mot de passe: {e}")
return False
def is_password_hashed(password: str) -> bool:
"""Vérifie si un mot de passe est déjà hashé"""
if not password:
return False
# Un hash valide contient un $ et fait au moins 64 caractères (32 salt + $ + 64 hash)
return '$' in password and len(password) >= 97
def migrate_password_if_needed(current_password: str) -> tuple:
"""
Migre un mot de passe en clair vers un hash si nécessaire
Retourne (password_hash, was_migrated)
"""
if not current_password:
return '', False
if is_password_hashed(current_password):
return current_password, False
# Migrer le mot de passe
hashed = hash_password(current_password)
logger.info("🔐 Mot de passe migré vers format hashé")
return hashed, True
# ============================================================
# RATE LIMITING & PROTECTION BRUTE-FORCE
# ============================================================
class RateLimiter:
"""Gestionnaire de rate limiting en mémoire"""
def __init__(self):
self.requests = {} # {ip: [(timestamp, count), ...]}
self.failed_attempts = {} # {ip: {'count': int, 'locked_until': timestamp}}
self._load_failed_attempts()
def _load_failed_attempts(self):
"""Charge les tentatives échouées depuis le fichier"""
try:
if FAILED_ATTEMPTS_FILE.exists():
with open(FAILED_ATTEMPTS_FILE) as f:
data = json.load(f)
# Nettoyer les entrées expirées
now = time.time()
self.failed_attempts = {
ip: info for ip, info in data.items()
if info.get('locked_until', 0) > now or info.get('count', 0) > 0
}
except Exception as e:
logger.warning(f"Erreur chargement tentatives échouées: {e}")
self.failed_attempts = {}
def _save_failed_attempts(self):
"""Sauvegarde les tentatives échouées"""
try:
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
with open(FAILED_ATTEMPTS_FILE, 'w') as f:
json.dump(self.failed_attempts, f)
except Exception as e:
logger.warning(f"Erreur sauvegarde tentatives échouées: {e}")
def is_rate_limited(self, ip: str) -> bool:
"""Vérifie si une IP est rate-limitée"""
now = time.time()
with _lock:
# Nettoyer les anciennes entrées
if ip in self.requests:
self.requests[ip] = [
(ts, count) for ts, count in self.requests[ip]
if now - ts < RATE_LIMIT_WINDOW
]
# Compter les requêtes récentes
if ip in self.requests:
total = sum(count for _, count in self.requests[ip])
if total >= RATE_LIMIT_MAX_REQUESTS:
return True
# Ajouter cette requête
if ip not in self.requests:
self.requests[ip] = []
self.requests[ip].append((now, 1))
return False
def is_locked_out(self, ip: str) -> tuple:
"""
Vérifie si une IP est bloquée après trop de tentatives échouées
Retourne (is_locked, remaining_seconds)
"""
with _lock:
if ip not in self.failed_attempts:
return False, 0
info = self.failed_attempts[ip]
locked_until = info.get('locked_until', 0)
if locked_until > time.time():
remaining = int(locked_until - time.time())
return True, remaining
# Le lockout a expiré, réinitialiser
if info.get('count', 0) >= MAX_FAILED_ATTEMPTS:
self.failed_attempts[ip] = {'count': 0, 'locked_until': 0}
self._save_failed_attempts()
return False, 0
def record_failed_attempt(self, ip: str, username: str = None):
"""Enregistre une tentative de connexion échouée"""
with _lock:
if ip not in self.failed_attempts:
self.failed_attempts[ip] = {'count': 0, 'locked_until': 0}
self.failed_attempts[ip]['count'] = self.failed_attempts[ip].get('count', 0) + 1
count = self.failed_attempts[ip]['count']
logger.warning(f"⚠️ Tentative échouée #{count} depuis {ip}" +
(f" (user: {username})" if username else ""))
# Bloquer après trop de tentatives
if count >= MAX_FAILED_ATTEMPTS:
self.failed_attempts[ip]['locked_until'] = time.time() + LOCKOUT_DURATION
logger.warning(f"🔒 IP bloquée pour {LOCKOUT_DURATION}s: {ip}")
self._save_failed_attempts()
def record_successful_login(self, ip: str):
"""Réinitialise les tentatives après une connexion réussie"""
with _lock:
if ip in self.failed_attempts:
del self.failed_attempts[ip]
self._save_failed_attempts()
# Instance globale
rate_limiter = RateLimiter()
# ============================================================
# PROTECTION CSRF
# ============================================================
def generate_csrf_token() -> str:
"""Génère un token CSRF"""
return secrets.token_hex(32)
def validate_csrf_token(session_token: str, form_token: str) -> bool:
"""Valide un token CSRF"""
if not session_token or not form_token:
return False
return secrets.compare_digest(session_token, form_token)
# ============================================================
# HEADERS DE SÉCURITÉ
# ============================================================
def get_security_headers() -> dict:
"""Retourne les headers de sécurité HTTP recommandés"""
return {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'SAMEORIGIN',
'X-XSS-Protection': '1; mode=block',
'Referrer-Policy': 'strict-origin-when-cross-origin',
'Permissions-Policy': 'geolocation=(), microphone=(), camera=()',
'Content-Security-Policy': (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://www.youtube.com https://s.ytimg.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https://image.tmdb.org https://i.ytimg.com https://*.last.fm; "
"frame-src https://www.youtube.com https://www.youtube-nocookie.com; "
"connect-src 'self'; "
"font-src 'self'; "
"object-src 'none'; "
"base-uri 'self'; "
"form-action 'self';"
),
'Strict-Transport-Security': 'max-age=31536000; includeSubDomains'
}
# ============================================================
# VALIDATION DES ENTRÉES
# ============================================================
def sanitize_input(value: str, max_length: int = 200) -> str:
"""Nettoie une entrée utilisateur"""
if not value:
return ''
# Limiter la longueur
value = str(value)[:max_length]
# Supprimer les caractères de contrôle
value = ''.join(char for char in value if ord(char) >= 32 or char in '\n\r\t')
return value.strip()
def validate_username(username: str) -> bool:
"""Valide un nom d'utilisateur"""
if not username:
return False
if len(username) < 3 or len(username) > 50:
return False
# Autoriser lettres, chiffres, underscores, tirets
import re
return bool(re.match(r'^[a-zA-Z0-9_-]+$', username))
def get_client_ip(request) -> str:
"""Récupère l'IP réelle du client (gère les proxies)"""
# Headers de proxy communs
headers_to_check = [
'X-Forwarded-For',
'X-Real-IP',
'CF-Connecting-IP', # Cloudflare
'True-Client-IP',
]
for header in headers_to_check:
if header in request.headers:
# X-Forwarded-For peut contenir plusieurs IPs
ip = request.headers[header].split(',')[0].strip()
if ip:
return ip
return request.remote_addr or '127.0.0.1'
# ============================================================
# GESTION SÉCURISÉE DE LA CONFIG
# ============================================================
def load_security_config() -> dict:
"""Charge la configuration de sécurité"""
default_config = {
'password_hash': '',
'password_migrated': False,
'last_password_change': None,
'created_at': datetime.now().isoformat()
}
try:
if SECURITY_CONFIG_FILE.exists():
with open(SECURITY_CONFIG_FILE) as f:
config = json.load(f)
return {**default_config, **config}
except Exception as e:
logger.warning(f"Erreur chargement config sécurité: {e}")
return default_config
def save_security_config(config: dict):
"""Sauvegarde la configuration de sécurité"""
try:
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
with open(SECURITY_CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
# Permissions restrictives
os.chmod(SECURITY_CONFIG_FILE, 0o600)
except Exception as e:
logger.error(f"Erreur sauvegarde config sécurité: {e}")
# ============================================================
# LOGS DE SÉCURITÉ
# ============================================================
def log_security_event(event_type: str, ip: str, details: str = None):
"""Log un événement de sécurité"""
message = f"[SECURITY] {event_type} - IP: {ip}"
if details:
message += f" - {details}"
if event_type in ['LOGIN_FAILED', 'LOCKOUT', 'CSRF_INVALID', 'RATE_LIMITED']:
logger.warning(message)
else:
logger.info(message)