Files
Seedmover/app.py
2026-03-23 22:24:24 +01:00

688 lines
26 KiB
Python

import eventlet
eventlet.monkey_patch()
import os
import ctypes
import json
import uuid
import threading
import time
import hmac
from functools import wraps
from flask import Flask, render_template, request, jsonify, session, redirect, url_for
from flask_socketio import SocketIO
import plugins as plugin_registry
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'seedmover-secret')
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
app.config['PERMANENT_SESSION_LIFETIME'] = int(os.environ.get('SESSION_LIFETIME', 86400))
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='eventlet')
SEEDBOX_PATH = os.environ.get('SEEDBOX_PATH', '/mnt/seedbox')
NAS_PATH = os.environ.get('NAS_PATH', '/mnt/nas')
APP_TITLE = os.environ.get('APP_TITLE', 'SeedMover')
APP_USERNAME = os.environ.get('APP_USERNAME', 'admin')
APP_PASSWORD = os.environ.get('APP_PASSWORD', 'changeme')
CACHE_TTL = int(os.environ.get('CACHE_TTL', 60))
DATA_DIR = os.environ.get('DATA_DIR', '/app/data')
CONNECTIONS_FILE = os.path.join(DATA_DIR, 'connections.json')
os.makedirs(DATA_DIR, exist_ok=True)
HISTORY_FILE = os.path.join(DATA_DIR, 'history.json')
HISTORY_MAX = int(os.environ.get('HISTORY_MAX', 50))
history_lock = threading.Lock()
def _load_history():
try:
with open(HISTORY_FILE) as f:
return json.load(f)
except Exception:
return []
def _save_history(entries):
with open(HISTORY_FILE, 'w') as f:
json.dump(entries[:HISTORY_MAX], f, indent=2)
def _add_history(entry):
with history_lock:
entries = _load_history()
entries.insert(0, entry)
_save_history(entries)
# ─── Connexions persistées ────────────────────────────────────────────────────
connections_lock = threading.Lock()
def _load_connections():
try:
with open(CONNECTIONS_FILE) as f:
return json.load(f)
except Exception:
return {}
def _save_connections(conns):
with open(CONNECTIONS_FILE, 'w') as f:
json.dump(conns, f, indent=2)
def _ensure_default_connections():
conns = _load_connections()
changed = False
if 'seedbox' not in conns:
conns['seedbox'] = {
'id': 'seedbox', 'name': 'Seedbox', 'type': 'local',
'readonly': True, 'config': {'root_path': SEEDBOX_PATH}
}
changed = True
if 'nas' not in conns:
conns['nas'] = {
'id': 'nas', 'name': 'NAS', 'type': 'local',
'readonly': False, 'config': {'root_path': NAS_PATH}
}
changed = True
if changed:
_save_connections(conns)
_ensure_default_connections()
# ─── Pool de connexions FS ────────────────────────────────────────────────────
fs_pool = {} # connection_id -> instance FS
fs_pool_lock = threading.Lock()
fs_conn_locks = {} # connection_id -> RLock (sérialise les accès SFTP)
def _get_conn_lock(connection_id):
"""Retourne un verrou dédié à cette connexion (crée si absent)."""
with fs_pool_lock:
if connection_id not in fs_conn_locks:
fs_conn_locks[connection_id] = threading.RLock()
return fs_conn_locks[connection_id]
def get_fs(connection_id):
conns = _load_connections()
conn_def = conns.get(connection_id)
if not conn_def:
raise ValueError(f"Connexion inconnue : {connection_id}")
with fs_pool_lock:
fs = fs_pool.get(connection_id)
if fs and fs.is_connected():
return fs, conn_def
cls = plugin_registry.get_plugin(conn_def['type'])
if not cls:
raise ValueError(f"Plugin inconnu : {conn_def['type']}")
fs = cls()
fs.connect(conn_def['config'])
fs_pool[connection_id] = fs
return fs, conn_def
def invalidate_fs(connection_id):
with fs_pool_lock:
fs_pool.pop(connection_id, None)
# ─── Cache ────────────────────────────────────────────────────────────────────
dir_cache = {}
dir_cache_lock = threading.Lock()
def cache_get(key):
with dir_cache_lock:
entry = dir_cache.get(key)
if entry and (time.time() - entry['ts']) < CACHE_TTL:
return entry['data']
return None
def cache_set(key, data):
with dir_cache_lock:
dir_cache[key] = {'data': data, 'ts': time.time()}
def cache_invalidate(connection_id, path):
import posixpath, os.path as osp
try:
parent = posixpath.dirname(path) or osp.dirname(path)
except Exception:
parent = path
with dir_cache_lock:
for k in list(dir_cache.keys()):
if k in (f"{connection_id}:{path}", f"{connection_id}:{parent}"):
dir_cache.pop(k, None)
# ─── File de transfert ───────────────────────────────────────────────────────
transfer_queue = []
transfer_lock = threading.Lock()
transfer_thread = None
current_transfer = None
current_percent = 0
transfer_stop = False
transfer_pause = False
class TransferStopped(Exception):
pass
def _trim_memory():
"""Force glibc à rendre la mémoire inutilisée au système après un transfert."""
import gc
gc.collect()
try:
ctypes.cdll.LoadLibrary("libc.so.6").malloc_trim(0)
except Exception:
pass # Non-Linux, on ignore
def format_size(size):
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} PB"
def copy_between_fs(src_fs, src_path, dst_fs, dst_path, transfer_id):
global current_percent, transfer_stop, transfer_pause
total = src_fs.get_total_size(src_path)
copied = [0]
def open_dst(d_path):
"""Ouvre le fichier destination en écriture, retourne un handle."""
parent = dst_fs.dirname(d_path)
if parent:
dst_fs.mkdir(parent)
try:
# SFTP — garder le handle ouvert tout le transfert
return dst_fs._sftp.open(d_path, 'wb')
except AttributeError:
# Local
return open(d_path, 'wb')
def stream_file(s_path, d_path):
"""Copie chunk par chunk sans jamais accumuler en mémoire."""
handle = open_dst(d_path)
try:
for chunk in src_fs.read_chunks(s_path, chunk_size=1024*1024):
while transfer_pause and not transfer_stop:
eventlet.sleep(0.2)
if transfer_stop:
raise TransferStopped()
handle.write(chunk) # écriture directe, pas de buffer
copied[0] += len(chunk)
pct = int(copied[0] / total * 100) if total > 0 else 100
current_percent = pct
eventlet.sleep(0) # céder la main à eventlet
socketio.emit('transfer_progress', {
'id': transfer_id, 'percent': pct,
'copied': copied[0], 'total': total,
'copied_fmt': format_size(copied[0]),
'total_fmt': format_size(total)
})
finally:
handle.close()
try:
if src_fs.isdir(src_path):
name = src_fs.basename(src_path)
dst_base = dst_fs.join(dst_path, name)
dst_fs.mkdir(dst_base)
for root, dirs, files in src_fs.walk(src_path):
rel = src_fs.relpath(root, src_path)
dst_root = dst_fs.join(dst_base, rel) if rel and rel != '.' else dst_base
dst_fs.mkdir(dst_root)
for fname in files:
stream_file(src_fs.join(root, fname), dst_fs.join(dst_root, fname))
else:
stream_file(src_path, dst_fs.join(dst_path, src_fs.basename(src_path)))
socketio.emit('transfer_done', {'id': transfer_id, 'success': True})
except TransferStopped:
try:
p = dst_fs.join(dst_path, src_fs.basename(src_path))
if dst_fs.exists(p):
dst_fs.remove(p)
except Exception:
pass
socketio.emit('transfer_done', {
'id': transfer_id, 'success': False,
'error': "Arrêté par l'utilisateur"
})
except Exception as e:
socketio.emit('transfer_done', {'id': transfer_id, 'success': False, 'error': str(e)})
def queue_worker():
global transfer_queue, current_transfer, transfer_thread
global transfer_stop, transfer_pause, current_percent
while True:
with transfer_lock:
if not transfer_queue:
current_transfer = None
current_percent = 0
transfer_thread = None
return
transfer = transfer_queue.pop(0)
current_transfer = transfer
transfer_stop = False
transfer_pause = False
socketio.emit('transfer_started', {'id': transfer['id'], 'name': transfer['name']})
try:
src_fs, _ = get_fs(transfer['src_connection'])
dst_fs, _ = get_fs(transfer['dst_connection'])
src_lock = _get_conn_lock(transfer['src_connection'])
dst_lock = _get_conn_lock(transfer['dst_connection'])
# Acquérir les deux verrous dans un ordre fixe pour éviter le deadlock
locks = sorted([
(transfer['src_connection'], src_lock),
(transfer['dst_connection'], dst_lock)
], key=lambda x: x[0])
t_start = time.time()
with locks[0][1]:
with locks[1][1]:
copy_between_fs(src_fs, transfer['src'], dst_fs, transfer['dst'], transfer['id'])
t_end = time.time()
cache_invalidate(transfer['dst_connection'], transfer['dst'])
# Historique — on récupère la taille réelle si possible
try:
fsize = src_fs.get_total_size(transfer['src'])
except Exception:
fsize = 0
_add_history({
'name': transfer['name'],
'src': transfer['src'],
'dst': transfer['dst'],
'src_connection': transfer['src_connection'],
'dst_connection': transfer['dst_connection'],
'size': fsize,
'duration': round(t_end - t_start, 1),
'date': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t_end)),
'success': True
})
except Exception as e:
_add_history({
'name': transfer.get('name', ''),
'src': transfer.get('src', ''),
'dst': transfer.get('dst', ''),
'src_connection': transfer.get('src_connection', ''),
'dst_connection': transfer.get('dst_connection', ''),
'size': 0,
'duration': 0,
'date': time.strftime('%Y-%m-%d %H:%M:%S'),
'success': False,
'error': str(e)
})
socketio.emit('transfer_done', {'id': transfer['id'], 'success': False, 'error': str(e)})
with transfer_lock:
current_transfer = None
current_percent = 0
_trim_memory()
time.sleep(0.1)
# ─── Auth ─────────────────────────────────────────────────────────────────────
def check_password(password):
return hmac.compare_digest(password, APP_PASSWORD)
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get('logged_in'):
if request.is_json or request.path.startswith('/api/'):
return jsonify({'error': 'Non authentifié'}), 401
return redirect(url_for('login', next=request.path))
return f(*args, **kwargs)
return decorated
# ─── Routes Auth ──────────────────────────────────────────────────────────────
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
if username == APP_USERNAME and check_password(password):
session.permanent = True
session['logged_in'] = True
next_url = request.args.get('next', '/')
if not next_url.startswith('/'):
next_url = '/'
return redirect(next_url)
error = 'Identifiants incorrects'
return render_template('login.html', title=APP_TITLE, error=error)
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
@app.route('/')
@login_required
def index():
return render_template('index.html',
title=APP_TITLE,
plugins=plugin_registry.list_plugins())
# ─── API Connexions ───────────────────────────────────────────────────────────
@app.route('/api/connections')
@login_required
def list_connections():
conns = _load_connections()
safe = []
for c in conns.values():
if not isinstance(c, dict) or 'type' not in c:
continue # ignorer les clés internes comme __defaults__
sc = {k: v for k, v in c.items() if k != 'config'}
cfg_safe = {}
for k, v in c.get('config', {}).items():
cfg_safe[k] = '***' if any(x in k.lower() for x in ['password', 'key']) else v
sc['config'] = cfg_safe
cls = plugin_registry.get_plugin(c['type'])
sc['type_label'] = cls.PLUGIN_LABEL if cls else c['type']
safe.append(sc)
return jsonify(safe)
@app.route('/api/connections', methods=['POST'])
@login_required
def add_connection():
data = request.json
conn_type = data.get('type')
name = data.get('name', '').strip()
config = data.get('config', {})
if not conn_type or not name:
return jsonify({'error': 'type et name requis'}), 400
cls = plugin_registry.get_plugin(conn_type)
if not cls:
return jsonify({'error': f'Plugin inconnu : {conn_type}'}), 400
try:
fs = cls()
fs.connect(config)
fs.list(config.get('root_path', '/'))
fs.disconnect()
except Exception as e:
return jsonify({'error': f'Connexion échouée : {e}'}), 400
conn_id = str(uuid.uuid4())[:8]
conn = {'id': conn_id, 'name': name, 'type': conn_type,
'readonly': data.get('readonly', False), 'config': config}
with connections_lock:
conns = _load_connections()
conns[conn_id] = conn
_save_connections(conns)
return jsonify({'success': True, 'id': conn_id})
@app.route('/api/connections/<conn_id>', methods=['PUT'])
@login_required
def update_connection(conn_id):
data = request.json
name = data.get('name', '').strip()
config = data.get('config', {})
if not name:
return jsonify({'error': 'name requis'}), 400
with connections_lock:
conns = _load_connections()
if conn_id not in conns:
return jsonify({'error': 'Connexion introuvable'}), 404
conn = conns[conn_id]
conn_type = conn['type']
# Fusionner la config : garder les valeurs masquées (***) de l'ancienne config
merged_config = dict(conn.get('config', {}))
for k, v in config.items():
if v != '***':
merged_config[k] = v
# Tester la connexion avec la nouvelle config
cls = plugin_registry.get_plugin(conn_type)
try:
fs = cls()
fs.connect(merged_config)
fs.list(merged_config.get('root_path', '/'))
fs.disconnect()
except Exception as e:
return jsonify({'error': f'Connexion échouée : {e}'}), 400
conn['name'] = name
conn['config'] = merged_config
conn['readonly'] = data.get('readonly', conn.get('readonly', False))
conns[conn_id] = conn
_save_connections(conns)
invalidate_fs(conn_id)
return jsonify({'success': True})
@app.route('/api/connections/<conn_id>', methods=['DELETE'])
@login_required
def delete_connection(conn_id):
if conn_id in ('seedbox', 'nas'):
return jsonify({'error': 'Impossible de supprimer les connexions par défaut'}), 400
with connections_lock:
conns = _load_connections()
conns.pop(conn_id, None)
_save_connections(conns)
invalidate_fs(conn_id)
return jsonify({'success': True})
@app.route('/api/connections/<conn_id>/test', methods=['POST'])
@login_required
def test_connection(conn_id):
try:
fs, conn_def = get_fs(conn_id)
fs.list(conn_def['config'].get('root_path', '/'))
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
# ─── API Fichiers ─────────────────────────────────────────────────────────────
@app.route('/api/list')
@login_required
def list_dir():
connection_id = request.args.get('connection', 'seedbox')
path = request.args.get('path', '')
force = request.args.get('force', 'false').lower() == 'true'
cache_key = f"{connection_id}:{path}"
if not force:
cached = cache_get(cache_key)
if cached is not None:
return jsonify(cached)
try:
fs, conn_def = get_fs(connection_id)
root = conn_def['config'].get('root_path', '/')
if not path:
path = root
conn_lock = _get_conn_lock(connection_id)
with conn_lock:
items = fs.list(path)
result = {'items': items, 'path': path, 'readonly': conn_def.get('readonly', False)}
cache_set(cache_key, result)
# Prefetch uniquement pour les FS locaux (SFTP non thread-safe sur même connexion)
if conn_def.get('type') == 'local':
subdirs = [i['path'] for i in items if i['is_dir']]
if subdirs:
def prefetch():
for sd in subdirs[:5]:
k = f"{connection_id}:{sd}"
if cache_get(k) is None:
try:
i2 = fs.list(sd)
cache_set(k, {'items': i2, 'path': sd,
'readonly': conn_def.get('readonly', False)})
except Exception:
pass
eventlet.sleep(0)
eventlet.spawn(prefetch)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e), 'items': []})
@app.route('/api/mkdir', methods=['POST'])
@login_required
def mkdir():
data = request.json
connection_id = data.get('connection', 'nas')
path = data.get('path', '')
name = data.get('name', '').strip()
if not name or '/' in name or '..' in name:
return jsonify({'error': 'Nom invalide'}), 400
try:
fs, conn_def = get_fs(connection_id)
if conn_def.get('readonly'):
return jsonify({'error': 'Connexion en lecture seule'}), 403
fs.mkdir(fs.join(path, name))
cache_invalidate(connection_id, path)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/rename', methods=['POST'])
@login_required
def rename():
data = request.json
connection_id = data.get('connection', 'nas')
old_path = data.get('old_path', '')
new_name = data.get('new_name', '').strip()
if not new_name or '/' in new_name or '..' in new_name:
return jsonify({'error': 'Nom invalide'}), 400
try:
fs, conn_def = get_fs(connection_id)
if conn_def.get('readonly'):
return jsonify({'error': 'Connexion en lecture seule'}), 403
parent = fs.dirname(old_path)
new_path = fs.join(parent, new_name)
fs.rename(old_path, new_path)
cache_invalidate(connection_id, parent)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── API Queue ────────────────────────────────────────────────────────────────
@app.route('/api/queue/add', methods=['POST'])
@login_required
def add_to_queue():
global transfer_thread
data = request.json
src = data.get('src')
dst = data.get('dst')
src_connection = data.get('src_connection', 'seedbox')
dst_connection = data.get('dst_connection', 'nas')
name = data.get('name', '')
if not src or not dst:
return jsonify({'error': 'src et dst requis'}), 400
conns = _load_connections()
dst_conn = conns.get(dst_connection, {})
if dst_conn.get('readonly'):
return jsonify({'error': 'Destination en lecture seule'}), 403
item_name = name or src.split('/')[-1]
force = data.get('force', False)
# Vérifier si le fichier/dossier existe déjà à destination
if not force:
try:
dst_fs, _ = get_fs(dst_connection)
dst_full = dst_fs.join(dst, item_name)
if dst_fs.exists(dst_full):
try:
existing_size = dst_fs.get_total_size(dst_full)
except Exception:
existing_size = 0
return jsonify({
'exists': True,
'name': item_name,
'dst_path': dst_full,
'existing_size': existing_size
})
except Exception:
pass # En cas d'erreur de vérification, on laisse passer
transfer_id = f"t_{int(time.time() * 1000)}"
transfer = {
'id': transfer_id, 'src': src, 'dst': dst,
'src_connection': src_connection, 'dst_connection': dst_connection,
'name': item_name
}
with transfer_lock:
transfer_queue.append(transfer)
queue_snapshot = [{'id': t['id'], 'name': t['name']} for t in transfer_queue]
socketio.emit('queue_updated', {'queue': queue_snapshot})
if transfer_thread is None or not transfer_thread.is_alive():
transfer_thread = threading.Thread(target=queue_worker, daemon=True)
transfer_thread.start()
return jsonify({'success': True, 'id': transfer_id})
@app.route('/api/queue/remove', methods=['POST'])
@login_required
def remove_from_queue():
data = request.json
transfer_id = data.get('id')
with transfer_lock:
transfer_queue[:] = [t for t in transfer_queue if t['id'] != transfer_id]
queue_snapshot = [{'id': t['id'], 'name': t['name']} for t in transfer_queue]
socketio.emit('queue_updated', {'queue': queue_snapshot})
return jsonify({'success': True})
@app.route('/api/queue')
@login_required
def get_queue():
with transfer_lock:
queue_snapshot = [{'id': t['id'], 'name': t['name']} for t in transfer_queue]
cur = {
'id': current_transfer['id'], 'name': current_transfer['name'],
'percent': current_percent
} if current_transfer else None
return jsonify({'queue': queue_snapshot, 'current': cur, 'paused': transfer_pause})
@app.route('/api/transfer/stop', methods=['POST'])
@login_required
def transfer_stop_route():
global transfer_stop, transfer_pause
transfer_stop = True
transfer_pause = False
return jsonify({'success': True})
@app.route('/api/transfer/pause', methods=['POST'])
@login_required
def transfer_pause_route():
global transfer_pause
transfer_pause = not transfer_pause
socketio.emit('transfer_paused', {'paused': transfer_pause})
return jsonify({'success': True, 'paused': transfer_pause})
@app.route('/api/panels/default', methods=['GET'])
@login_required
def get_default_panels():
conns = _load_connections()
defaults = conns.get('__defaults__', {
'left': {'connection': 'seedbox', 'path': ''},
'right': {'connection': 'nas', 'path': ''}
})
return jsonify(defaults)
@app.route('/api/panels/default', methods=['POST'])
@login_required
def save_default_panels():
data = request.json
with connections_lock:
conns = _load_connections()
conns['__defaults__'] = {
'left': {'connection': data.get('left_connection', 'seedbox'),
'path': data.get('left_path', '')},
'right': {'connection': data.get('right_connection', 'nas'),
'path': data.get('right_path', '')}
}
_save_connections(conns)
return jsonify({'success': True})
@app.route('/api/history')
@login_required
def get_history():
return jsonify(_load_history())
@app.route('/api/history/clear', methods=['POST'])
@login_required
def clear_history():
with history_lock:
_save_history([])
return jsonify({'success': True})
@app.route('/api/plugins')
@login_required
def get_plugins():
return jsonify(plugin_registry.list_plugins())
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=5000, debug=False, allow_unsafe_werkzeug=True)