1040 lines
42 KiB
Python
1040 lines
42 KiB
Python
"""
|
|
Git Update Checker - GUI multi-repo.
|
|
Vérifie les mises à jour de plusieurs dépôts Git et propose de les télécharger.
|
|
Accès lecture seule uniquement (fetch/pull/checkout, jamais de push).
|
|
Tous les chemins sont relatifs à l'emplacement de l'exécutable.
|
|
"""
|
|
|
|
VERSION = "0.6.6"
|
|
|
|
import subprocess
|
|
import sys
|
|
import os
|
|
import configparser
|
|
import logging
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import tkinter as tk
|
|
from tkinter import ttk, messagebox
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
# Forcer UTF-8 sur Windows
|
|
if sys.platform == "win32":
|
|
os.system("chcp 65001 >nul 2>&1")
|
|
try:
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
|
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ── Utilitaires ──────────────────────────────────────────────────────────────
|
|
|
|
def get_exe_dir():
|
|
if getattr(sys, "frozen", False):
|
|
return Path(sys.executable).parent
|
|
return Path(__file__).parent
|
|
|
|
|
|
def resolve_relative(path_str):
|
|
"""Résout un chemin relatif par rapport au dossier de l'exe."""
|
|
p = Path(path_str)
|
|
if not p.is_absolute():
|
|
p = get_exe_dir() / p
|
|
return p.resolve()
|
|
|
|
|
|
# ── Logging ──────────────────────────────────────────────────────────────────
|
|
|
|
def setup_logging():
|
|
"""Configure le logging dans un dossier log/ à côté de l'exe."""
|
|
log_dir = get_exe_dir() / "log"
|
|
log_dir.mkdir(exist_ok=True)
|
|
|
|
log_file = log_dir / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
|
|
|
logger = logging.getLogger("GitUpdateChecker")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Handler fichier
|
|
fh = logging.FileHandler(log_file, encoding="utf-8")
|
|
fh.setLevel(logging.DEBUG)
|
|
fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S"))
|
|
logger.addHandler(fh)
|
|
|
|
# Nettoyage des vieux logs (garder 30 jours)
|
|
for old_log in sorted(log_dir.glob("*.log"))[:-30]:
|
|
try:
|
|
old_log.unlink()
|
|
except OSError:
|
|
pass
|
|
|
|
return logger
|
|
|
|
|
|
log = setup_logging()
|
|
|
|
|
|
def run_git(args, cwd=None, timeout=30):
|
|
# -c safe.directory=* : évite l'erreur "dubious ownership" sur clé USB
|
|
cmd = ["git", "-c", "safe.directory=*"] + args
|
|
log.debug(f"git {' '.join(args)} (cwd={cwd})")
|
|
try:
|
|
result = subprocess.run(
|
|
cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout,
|
|
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0,
|
|
)
|
|
if result.returncode != 0 and result.stderr.strip():
|
|
log.warning(f"git {args[0]} erreur: {result.stderr.strip()}")
|
|
return result.returncode, result.stdout.strip(), result.stderr.strip()
|
|
except FileNotFoundError:
|
|
log.error("Git non trouve dans le PATH")
|
|
return -1, "", "Git n'est pas installe ou pas dans le PATH."
|
|
except subprocess.TimeoutExpired:
|
|
log.error(f"Timeout: git {' '.join(args)}")
|
|
return 1, "", "Timeout"
|
|
|
|
|
|
# ── Auto-update du programme ─────────────────────────────────────────────────
|
|
|
|
def _version_tuple(v):
|
|
"""Convertit '0.4' en (0, 4) pour comparaison."""
|
|
try:
|
|
return tuple(int(x) for x in v.strip().split("."))
|
|
except (ValueError, AttributeError):
|
|
return (0,)
|
|
|
|
|
|
def _get_self_update_config():
|
|
"""Lit la config [self-update] depuis config.ini. Retourne (url, exe_name, branch) ou (None, None, None)."""
|
|
config_path = get_config_path()
|
|
if not config_path.exists():
|
|
return None, None, None
|
|
config = configparser.ConfigParser()
|
|
config.read(config_path, encoding="utf-8")
|
|
if not config.has_section("self-update"):
|
|
return None, None, None
|
|
url = config.get("self-update", "url", fallback="").strip().rstrip("/")
|
|
exe_name = config.get("self-update", "exe_name", fallback="GitUpdateChecker.exe").strip()
|
|
branch = config.get("self-update", "branch", fallback="master").strip()
|
|
if not url:
|
|
return None, None, None
|
|
return url, exe_name, branch
|
|
|
|
|
|
def check_self_update():
|
|
"""
|
|
Vérifie si une nouvelle version est disponible sur le serveur Gitea.
|
|
Télécharge version.txt via HTTP et compare avec VERSION locale.
|
|
Retourne (needs_update: bool, info: str).
|
|
"""
|
|
repo_url, _, branch = _get_self_update_config()
|
|
if not repo_url:
|
|
log.info("Auto-update: pas de section [self-update] dans config.ini, skip")
|
|
return False, ""
|
|
|
|
log.info("Auto-update: verification via HTTP...")
|
|
|
|
version_url = f"{repo_url}/raw/branch/{branch}/version.txt"
|
|
try:
|
|
req = urllib.request.Request(version_url, headers={"User-Agent": "GitUpdateChecker"})
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
remote_version = resp.read().decode("utf-8").strip()
|
|
except (urllib.error.URLError, OSError) as e:
|
|
log.warning(f"Auto-update: impossible de verifier la version distante: {e}")
|
|
return False, f"Impossible de contacter le serveur: {e}"
|
|
|
|
log.info(f"Auto-update: version locale={VERSION} distante={remote_version}")
|
|
|
|
if _version_tuple(remote_version) <= _version_tuple(VERSION):
|
|
log.info("Auto-update: programme a jour")
|
|
return False, ""
|
|
|
|
info = f"Version actuelle : {VERSION}\nVersion disponible : {remote_version}"
|
|
log.info(f"Auto-update: MAJ disponible - {remote_version}")
|
|
return True, info
|
|
|
|
|
|
def do_self_update():
|
|
"""
|
|
Télécharge le nouvel exe depuis le serveur Gitea.
|
|
Stratégie : télécharger dans .new, renommer l'exe actuel en .old, placer le nouveau.
|
|
Retourne (ok, message).
|
|
"""
|
|
repo_url, exe_name, branch = _get_self_update_config()
|
|
if not repo_url:
|
|
return False, "Configuration [self-update] manquante"
|
|
|
|
is_frozen = getattr(sys, "frozen", False)
|
|
if not is_frozen:
|
|
log.warning("Auto-update: mode script, telechargement non supporte")
|
|
return False, "Auto-update uniquement supporte en mode .exe"
|
|
|
|
exe_path = Path(sys.executable)
|
|
exe_old_path = exe_path.with_suffix(".exe.old")
|
|
exe_new_path = exe_path.with_suffix(".exe.new")
|
|
|
|
# Telecharger le nouvel exe
|
|
download_url = f"{repo_url}/raw/branch/{branch}/{exe_name}"
|
|
log.info(f"Auto-update: telechargement de {download_url}")
|
|
|
|
try:
|
|
req = urllib.request.Request(download_url, headers={"User-Agent": "GitUpdateChecker"})
|
|
total_bytes = 0
|
|
with urllib.request.urlopen(req, timeout=60) as resp, open(exe_new_path, "wb") as f:
|
|
while True:
|
|
chunk = resp.read(65536) # 64 Ko par bloc
|
|
if not chunk:
|
|
break
|
|
f.write(chunk)
|
|
total_bytes += len(chunk)
|
|
|
|
if total_bytes < 1000:
|
|
log.error(f"Auto-update: fichier telecharge trop petit ({total_bytes} octets)")
|
|
exe_new_path.unlink(missing_ok=True)
|
|
return False, "Le fichier telecharge semble invalide"
|
|
|
|
log.info(f"Auto-update: telecharge {total_bytes} octets -> {exe_new_path.name}")
|
|
|
|
# Supprimer le "Mark of the Web" (Zone.Identifier) ajouté par Windows sur les
|
|
# fichiers téléchargés via HTTP, qui bloquerait le chargement des DLL au lancement.
|
|
try:
|
|
os.remove(str(exe_new_path) + ":Zone.Identifier")
|
|
log.info("Auto-update: Zone.Identifier supprime")
|
|
except FileNotFoundError:
|
|
pass # Pas de marque, rien à faire
|
|
except OSError as e:
|
|
log.warning(f"Auto-update: impossible de supprimer Zone.Identifier: {e}")
|
|
|
|
except (urllib.error.URLError, OSError) as e:
|
|
log.error(f"Auto-update: echec telechargement: {e}")
|
|
if exe_new_path.exists():
|
|
try:
|
|
exe_new_path.unlink()
|
|
except OSError:
|
|
pass
|
|
return False, f"Erreur telechargement: {e}"
|
|
|
|
# Renommer : exe actuel -> .old
|
|
try:
|
|
if exe_old_path.exists():
|
|
exe_old_path.unlink()
|
|
exe_path.rename(exe_old_path)
|
|
log.info(f"Auto-update: {exe_path.name} -> {exe_old_path.name}")
|
|
except OSError as e:
|
|
log.error(f"Auto-update: impossible de renommer l'exe: {e}")
|
|
if exe_new_path.exists():
|
|
try:
|
|
exe_new_path.unlink()
|
|
except OSError:
|
|
pass
|
|
return False, f"Impossible de renommer l'exe: {e}"
|
|
|
|
# Renommer : .new -> exe
|
|
try:
|
|
exe_new_path.rename(exe_path)
|
|
log.info(f"Auto-update: {exe_new_path.name} -> {exe_path.name}")
|
|
except OSError as e:
|
|
log.error(f"Auto-update: impossible de placer le nouvel exe: {e}")
|
|
try:
|
|
exe_old_path.rename(exe_path)
|
|
log.info("Auto-update: ancien exe restaure")
|
|
except OSError:
|
|
pass
|
|
return False, f"Impossible de placer le nouvel exe: {e}"
|
|
|
|
return True, "Mise a jour reussie !\nLe programme va redemarrer."
|
|
|
|
|
|
def relaunch_program():
|
|
"""Relance le programme (nouvel exe) et quitte le processus actuel."""
|
|
if getattr(sys, "frozen", False):
|
|
exe_path = str(Path(sys.executable))
|
|
log.info(f"Auto-update: relance de {exe_path}")
|
|
# Lancer un batch qui attend 1s puis lance le nouvel exe et supprime l'ancien .old
|
|
bat_path = str(get_exe_dir() / "_update.bat")
|
|
bat_content = (
|
|
f'@echo off\n'
|
|
f'timeout /t 1 /nobreak >nul\n'
|
|
f'start "" "{exe_path}"\n'
|
|
f'del "{exe_path}.old" 2>nul\n'
|
|
f'del "%~f0"\n'
|
|
)
|
|
with open(bat_path, "w", encoding="mbcs") as f:
|
|
f.write(bat_content)
|
|
subprocess.Popen(
|
|
["cmd", "/c", bat_path],
|
|
creationflags=subprocess.CREATE_NO_WINDOW,
|
|
)
|
|
else:
|
|
# Mode script : relancer python
|
|
log.info("Auto-update: relance du script")
|
|
subprocess.Popen([sys.executable, __file__])
|
|
|
|
|
|
# ── Configuration ────────────────────────────────────────────────────────────
|
|
|
|
def get_config_path():
|
|
return get_exe_dir() / "config.ini"
|
|
|
|
|
|
def load_repos():
|
|
"""Charge la liste des dépôts depuis config.ini."""
|
|
config_path = get_config_path()
|
|
config = configparser.ConfigParser()
|
|
repos = []
|
|
|
|
if not config_path.exists():
|
|
# Créer un config.ini exemple
|
|
config["repo:Exemple"] = {
|
|
"url": "http://192.168.1.235:3125/user/repo",
|
|
"path": "../MonRepo",
|
|
}
|
|
with open(config_path, "w", encoding="utf-8") as f:
|
|
f.write("; Configuration des depots Git a surveiller\n")
|
|
f.write("; Les chemins (path) sont relatifs a l'emplacement de l'exe\n")
|
|
f.write("; Ajouter autant de sections [repo:NomDuRepo] que necessaire\n\n")
|
|
config.write(f)
|
|
return repos
|
|
|
|
config.read(config_path, encoding="utf-8")
|
|
|
|
for section in config.sections():
|
|
if section.startswith("repo:"):
|
|
name = section[5:]
|
|
url = config.get(section, "url", fallback="").strip()
|
|
path = config.get(section, "path", fallback="").strip()
|
|
if url and path:
|
|
repos.append({"name": name, "url": url, "path": path})
|
|
|
|
return repos
|
|
|
|
|
|
# ── Logique Git ──────────────────────────────────────────────────────────────
|
|
|
|
def check_repo(repo):
|
|
"""Vérifie un dépôt et retourne son état."""
|
|
name = repo["name"]
|
|
url = repo["url"]
|
|
local_path = str(resolve_relative(repo["path"]))
|
|
log.info(f"[{name}] Verification - {url} -> {local_path}")
|
|
|
|
result = {
|
|
"name": name,
|
|
"url": url,
|
|
"local_path": local_path,
|
|
"relative_path": repo["path"],
|
|
"exists": False,
|
|
"up_to_date": False,
|
|
"error": None,
|
|
"commits": [],
|
|
"files": [],
|
|
"local_changes": [],
|
|
"local_only": False,
|
|
"branch": "",
|
|
"local_hash": "",
|
|
"remote_hash": "",
|
|
"needs_clone": False,
|
|
"offline": False,
|
|
}
|
|
|
|
# Vérifier si le dossier existe et est un repo git
|
|
if not os.path.isdir(os.path.join(local_path, ".git")):
|
|
if os.path.isdir(local_path) and os.listdir(local_path):
|
|
result["error"] = "Le dossier existe mais n'est pas un depot git."
|
|
return result
|
|
result["needs_clone"] = True
|
|
return result
|
|
|
|
result["exists"] = True
|
|
|
|
# Branche courante
|
|
code, branch, _ = run_git(["rev-parse", "--abbrev-ref", "HEAD"], cwd=local_path)
|
|
if code != 0:
|
|
result["error"] = "Impossible de lire la branche courante."
|
|
return result
|
|
result["branch"] = branch
|
|
|
|
# Mettre à jour l'URL origin si elle a changé dans config.ini
|
|
run_git(["remote", "set-url", "origin", url], cwd=local_path)
|
|
|
|
# Fetch (détecte aussi si le remote est inaccessible)
|
|
code, _, err = run_git(["fetch", "origin"], cwd=local_path)
|
|
if code != 0:
|
|
offline_keywords = ["could not resolve", "connection refused", "unable to connect", "timed out", "the remote end hung up"]
|
|
if any(kw in err.lower() for kw in offline_keywords):
|
|
log.warning(f"[{name}] Remote inaccessible: {url}")
|
|
result["error"] = "Depot hors ligne (remote inaccessible)"
|
|
result["offline"] = True
|
|
else:
|
|
result["error"] = f"Erreur fetch : {err}"
|
|
return result
|
|
|
|
# Comparer HEAD local vs origin
|
|
code, local_hash, _ = run_git(["rev-parse", "HEAD"], cwd=local_path)
|
|
if code != 0:
|
|
result["error"] = "Impossible de lire le commit local."
|
|
return result
|
|
|
|
code, remote_hash, _ = run_git(["rev-parse", f"origin/{branch}"], cwd=local_path)
|
|
if code != 0:
|
|
result["error"] = f"Impossible de trouver origin/{branch}."
|
|
return result
|
|
|
|
result["local_hash"] = local_hash[:8]
|
|
result["remote_hash"] = remote_hash[:8]
|
|
|
|
# Modifications locales
|
|
code, local_diff, _ = run_git(["diff", "--name-status", "HEAD"], cwd=local_path)
|
|
if code == 0 and local_diff:
|
|
for line in local_diff.splitlines():
|
|
parts = line.split("\t", 1)
|
|
if len(parts) == 2:
|
|
sc = parts[0].strip()
|
|
fn = parts[1].strip()
|
|
status_map = {"D": "Supprime localement", "M": "Modifie localement"}
|
|
result["local_changes"].append({
|
|
"status": status_map.get(sc[0], sc),
|
|
"status_char": sc[0],
|
|
"file": fn,
|
|
})
|
|
|
|
if local_hash == remote_hash and not result["local_changes"]:
|
|
log.info(f"[{name}] A jour ({local_hash[:8]})")
|
|
result["up_to_date"] = True
|
|
return result
|
|
|
|
if local_hash == remote_hash:
|
|
log.info(f"[{name}] {len(result['local_changes'])} fichier(s) locaux modifies/supprimes")
|
|
result["local_only"] = True
|
|
return result
|
|
|
|
# Nouveaux commits distants
|
|
code, log_out, _ = run_git(
|
|
["log", "--oneline", "--format=%h|%an|%ar|%s", f"HEAD..origin/{branch}"],
|
|
cwd=local_path,
|
|
)
|
|
if code == 0 and log_out:
|
|
for line in log_out.splitlines():
|
|
parts = line.split("|", 3)
|
|
if len(parts) == 4:
|
|
result["commits"].append({
|
|
"hash": parts[0], "author": parts[1],
|
|
"date": parts[2], "message": parts[3],
|
|
})
|
|
|
|
# Fichiers distants modifiés
|
|
code, diff_out, _ = run_git(
|
|
["diff", "--name-status", f"HEAD..origin/{branch}"],
|
|
cwd=local_path,
|
|
)
|
|
if code == 0 and diff_out:
|
|
for line in diff_out.splitlines():
|
|
parts = line.split("\t", 1)
|
|
if len(parts) == 2:
|
|
sc = parts[0].strip()
|
|
fn = parts[1].strip()
|
|
status_map = {"A": "Ajoute", "M": "Modifie", "D": "Supprime", "R": "Renomme"}
|
|
result["files"].append({
|
|
"status": status_map.get(sc[0], sc),
|
|
"status_char": sc[0],
|
|
"file": fn,
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
def do_clone(repo):
|
|
"""Clone un dépôt."""
|
|
local_path = str(resolve_relative(repo["path"]))
|
|
log.info(f"Clonage: {repo['url']} -> {local_path}")
|
|
code, _, err = run_git(["clone", repo["url"], local_path], timeout=300)
|
|
if code == 0:
|
|
log.info(f"Clonage reussi: {local_path}")
|
|
else:
|
|
log.error(f"Clonage echoue: {err}")
|
|
return code == 0, err
|
|
|
|
|
|
def do_pull(local_path, branch):
|
|
"""Pull les mises à jour (lecture seule)."""
|
|
log.info(f"Pull: {local_path} (branche {branch})")
|
|
code, out, err = run_git(["pull", "origin", branch], cwd=local_path, timeout=120)
|
|
if code == 0:
|
|
log.info(f"Pull reussi: {local_path}")
|
|
else:
|
|
log.error(f"Pull echoue: {err}")
|
|
return code == 0, out, err
|
|
|
|
|
|
def do_restore(local_path):
|
|
"""Restaure les fichiers locaux modifiés/supprimés."""
|
|
log.info(f"Restauration: {local_path}")
|
|
code, _, err = run_git(["checkout", "--", "."], cwd=local_path)
|
|
if code == 0:
|
|
log.info(f"Restauration reussie: {local_path}")
|
|
else:
|
|
log.error(f"Restauration echouee: {err}")
|
|
return code == 0, err
|
|
|
|
|
|
# ── Interface graphique ──────────────────────────────────────────────────────
|
|
|
|
class App(tk.Tk):
|
|
@staticmethod
|
|
def _find_icon(filename="icon.png"):
|
|
"""Cherche un fichier icône : d'abord à côté de l'exe, sinon dans _internal (bundle)."""
|
|
p = get_exe_dir() / filename
|
|
if p.exists():
|
|
return p
|
|
if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"):
|
|
p = Path(sys._MEIPASS) / filename
|
|
if p.exists():
|
|
return p
|
|
return None
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.title(f"Git Update Checker v{VERSION}")
|
|
self.geometry("820x600")
|
|
self.minsize(700, 450)
|
|
self.configure(bg="#1e1e2e")
|
|
|
|
self.repos_config = load_repos()
|
|
self.repo_results = []
|
|
|
|
# Icône de la fenêtre (barre de titre + taskbar)
|
|
self._app_icon = None
|
|
icon_path = self._find_icon()
|
|
if icon_path:
|
|
try:
|
|
self._app_icon = tk.PhotoImage(file=str(icon_path))
|
|
self.iconphoto(True, self._app_icon)
|
|
except Exception as e:
|
|
log.warning(f"Icone fenetre: {e}")
|
|
self._app_icon = None
|
|
|
|
log.info(f"=== Demarrage Git Update Checker v{VERSION} ===")
|
|
self._cleanup_old_exe()
|
|
self._build_ui()
|
|
self.after(100, self._check_self_update_then_repos)
|
|
|
|
def _cleanup_old_exe(self):
|
|
"""Supprime l'ancien exe .old restant d'une mise a jour precedente."""
|
|
if getattr(sys, "frozen", False):
|
|
old_path = Path(sys.executable).with_suffix(".exe.old")
|
|
if old_path.exists():
|
|
try:
|
|
old_path.unlink()
|
|
log.info(f"Nettoyage: {old_path.name} supprime")
|
|
except OSError:
|
|
log.warning(f"Nettoyage: impossible de supprimer {old_path.name}")
|
|
|
|
def _build_ui(self):
|
|
style = ttk.Style(self)
|
|
style.theme_use("clam")
|
|
|
|
# Couleurs sombres
|
|
bg = "#1e1e2e"
|
|
fg = "#cdd6f4"
|
|
accent = "#89b4fa"
|
|
green = "#a6e3a1"
|
|
yellow = "#f9e2af"
|
|
red = "#f38ba8"
|
|
|
|
style.configure("TFrame", background=bg)
|
|
style.configure("TLabel", background=bg, foreground=fg, font=("Segoe UI", 10))
|
|
style.configure("Title.TLabel", background=bg, foreground=fg, font=("Segoe UI", 14, "bold"))
|
|
style.configure("Status.TLabel", background=bg, foreground=accent, font=("Segoe UI", 9))
|
|
style.configure("TButton", font=("Segoe UI", 10))
|
|
style.configure("Green.TLabel", background=bg, foreground=green, font=("Segoe UI", 10, "bold"))
|
|
style.configure("Yellow.TLabel", background=bg, foreground=yellow, font=("Segoe UI", 10, "bold"))
|
|
style.configure("Red.TLabel", background=bg, foreground=red, font=("Segoe UI", 10, "bold"))
|
|
|
|
# Header
|
|
header = ttk.Frame(self)
|
|
header.pack(fill="x", padx=15, pady=(15, 5))
|
|
|
|
# Icône dans le coin haut gauche (icon_small.png pré-générée à 32x32)
|
|
small_path = self._find_icon("icon_small.png")
|
|
if small_path:
|
|
try:
|
|
self._icon_small = tk.PhotoImage(file=str(small_path))
|
|
tk.Label(header, image=self._icon_small, bg=bg, bd=0).pack(side="left", padx=(0, 8))
|
|
except Exception as e:
|
|
log.warning(f"Icone header: {e}")
|
|
|
|
ttk.Label(header, text=f"Git Update Checker v{VERSION}", style="Title.TLabel").pack(side="left")
|
|
self.status_label = ttk.Label(header, text="Verification en cours...", style="Status.TLabel")
|
|
self.status_label.pack(side="right")
|
|
|
|
# Date (mise a jour chaque seconde)
|
|
self.date_label = ttk.Label(self, text="", style="Status.TLabel")
|
|
self.date_label.pack(anchor="w", padx=15)
|
|
self._tick_clock()
|
|
|
|
# Panneau principal (PanedWindow vertical : cartes en haut, log en bas)
|
|
paned = ttk.PanedWindow(self, orient="vertical")
|
|
paned.pack(fill="both", expand=True, padx=15, pady=10)
|
|
|
|
# Zone scrollable pour les repos
|
|
container = ttk.Frame(paned)
|
|
paned.add(container, weight=3)
|
|
|
|
self.canvas = tk.Canvas(container, bg=bg, highlightthickness=0)
|
|
scrollbar = ttk.Scrollbar(container, orient="vertical", command=self.canvas.yview)
|
|
self.scroll_frame = ttk.Frame(self.canvas)
|
|
|
|
self.scroll_frame.bind("<Configure>", lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all")))
|
|
self.canvas.create_window((0, 0), window=self.scroll_frame, anchor="nw")
|
|
self.canvas.configure(yscrollcommand=scrollbar.set)
|
|
|
|
self.canvas.pack(side="left", fill="both", expand=True)
|
|
scrollbar.pack(side="right", fill="y")
|
|
|
|
# Scroll avec la molette : cible le bon widget selon la position du curseur
|
|
self.bind_all("<MouseWheel>", self._on_mousewheel)
|
|
|
|
# Panneau de log en bas
|
|
log_frame = tk.Frame(paned, bg="#181825")
|
|
paned.add(log_frame, weight=1)
|
|
|
|
log_header = tk.Frame(log_frame, bg="#181825")
|
|
log_header.pack(fill="x", padx=8, pady=(6, 2))
|
|
tk.Label(log_header, text="Journal des operations", bg="#181825", fg="#a6adc8", font=("Segoe UI", 9, "bold")).pack(side="left")
|
|
tk.Button(log_header, text="Effacer", bg="#313244", fg="#cdd6f4", bd=0, font=("Segoe UI", 8),
|
|
command=self._clear_log_panel, activebackground="#45475a", activeforeground="#cdd6f4").pack(side="right")
|
|
|
|
self.log_text = tk.Text(log_frame, bg="#11111b", fg="#cdd6f4", font=("Consolas", 9),
|
|
height=8, bd=0, highlightthickness=0, wrap="word",
|
|
state="disabled", padx=8, pady=4)
|
|
log_scroll = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview)
|
|
self.log_text.configure(yscrollcommand=log_scroll.set)
|
|
|
|
self.log_text.pack(side="left", fill="both", expand=True, padx=(8, 0), pady=(0, 8))
|
|
log_scroll.pack(side="right", fill="y", padx=(0, 8), pady=(0, 8))
|
|
|
|
# Tags couleur pour le log GUI
|
|
self.log_text.tag_configure("info", foreground="#cdd6f4")
|
|
self.log_text.tag_configure("success", foreground="#a6e3a1")
|
|
self.log_text.tag_configure("warning", foreground="#f9e2af")
|
|
self.log_text.tag_configure("error", foreground="#f38ba8")
|
|
self.log_text.tag_configure("file_add", foreground="#a6e3a1")
|
|
self.log_text.tag_configure("file_mod", foreground="#f9e2af")
|
|
self.log_text.tag_configure("file_del", foreground="#f38ba8")
|
|
self.log_text.tag_configure("dim", foreground="#6c7086")
|
|
|
|
# Boutons en bas
|
|
btn_frame = ttk.Frame(self)
|
|
btn_frame.pack(fill="x", padx=15, pady=(0, 15))
|
|
|
|
self.btn_refresh = ttk.Button(btn_frame, text="Rafraichir", command=self._start_check)
|
|
self.btn_refresh.pack(side="left", padx=(0, 10))
|
|
|
|
self.btn_update_all = ttk.Button(btn_frame, text="Tout mettre a jour", command=self._update_all)
|
|
self.btn_update_all.pack(side="left")
|
|
self.btn_update_all.state(["disabled"])
|
|
|
|
ttk.Button(btn_frame, text="Ouvrir config.ini", command=self._open_config).pack(side="right")
|
|
|
|
self.btn_open_log = ttk.Button(btn_frame, text="Ouvrir les logs", command=self._open_log)
|
|
self.btn_open_log.pack(side="right", padx=(0, 10))
|
|
|
|
def _tick_clock(self):
|
|
"""Met à jour le label de date chaque seconde."""
|
|
self.date_label.configure(text=datetime.now().strftime(" %d/%m/%Y %H:%M:%S"))
|
|
self.after(1000, self._tick_clock)
|
|
|
|
def _on_mousewheel(self, event):
|
|
"""Scroll le bon widget selon où se trouve la souris."""
|
|
w = event.widget
|
|
while w is not None:
|
|
if w is self.canvas or w is self.scroll_frame:
|
|
self.canvas.yview_scroll(int(-1 * (event.delta / 120)), "units")
|
|
return
|
|
if w is self.log_text:
|
|
self.log_text.yview_scroll(int(-1 * (event.delta / 120)), "units")
|
|
return
|
|
try:
|
|
w = w.master
|
|
except AttributeError:
|
|
break
|
|
|
|
def _check_self_update_then_repos(self):
|
|
"""Vérifie d'abord la MAJ du programme, puis les repos."""
|
|
self.status_label.configure(text="Verification auto-update...")
|
|
|
|
def work():
|
|
needs, info = check_self_update()
|
|
self.after(0, lambda: self._handle_self_update(needs, info))
|
|
|
|
threading.Thread(target=work, daemon=True).start()
|
|
|
|
def _log_gui(self, message, tag="info"):
|
|
"""Ajoute une ligne dans le panneau de log."""
|
|
def _append():
|
|
self.log_text.configure(state="normal")
|
|
timestamp = datetime.now().strftime("%H:%M:%S")
|
|
self.log_text.insert("end", f"[{timestamp}] ", "dim")
|
|
self.log_text.insert("end", f"{message}\n", tag)
|
|
self.log_text.see("end")
|
|
self.log_text.configure(state="disabled")
|
|
# Appel thread-safe
|
|
if threading.current_thread() is threading.main_thread():
|
|
_append()
|
|
else:
|
|
self.after(0, _append)
|
|
|
|
def _clear_log_panel(self):
|
|
self.log_text.configure(state="normal")
|
|
self.log_text.delete("1.0", "end")
|
|
self.log_text.configure(state="disabled")
|
|
|
|
def _handle_self_update(self, needs_update, info):
|
|
"""Gère le résultat de l'auto-update."""
|
|
if needs_update:
|
|
answer = messagebox.askyesno(
|
|
"Mise a jour du programme",
|
|
f"Une mise a jour du programme est disponible !\n\n{info}\n\nMettre a jour maintenant ?",
|
|
)
|
|
if answer:
|
|
self.status_label.configure(text="Mise a jour du programme...")
|
|
|
|
def work():
|
|
ok, msg = do_self_update()
|
|
self.after(0, lambda: self._self_update_done(ok, msg))
|
|
|
|
threading.Thread(target=work, daemon=True).start()
|
|
return
|
|
|
|
self._start_check()
|
|
|
|
def _self_update_done(self, ok, msg):
|
|
"""Callback après auto-update."""
|
|
if ok:
|
|
messagebox.showinfo("Auto-update", msg)
|
|
log.info("Auto-update appliquee, relance...")
|
|
relaunch_program()
|
|
self.destroy()
|
|
return
|
|
else:
|
|
messagebox.showwarning("Auto-update", msg)
|
|
self._start_check()
|
|
|
|
def _start_check(self):
|
|
"""Lance la vérification dans un thread."""
|
|
self.btn_refresh.state(["disabled"])
|
|
self.btn_update_all.state(["disabled"])
|
|
self.status_label.configure(text="Verification en cours...")
|
|
self._log_gui("Verification des depots...", "info")
|
|
self.repos_config = load_repos()
|
|
|
|
if not self.repos_config:
|
|
self._clear_cards()
|
|
self.status_label.configure(text="Aucun depot configure")
|
|
self._show_no_repos()
|
|
self.btn_refresh.state(["!disabled"])
|
|
return
|
|
|
|
threading.Thread(target=self._check_all, daemon=True).start()
|
|
|
|
def _check_all(self):
|
|
"""Vérifie tous les repos en parallèle (max 4 à la fois)."""
|
|
total = len(self.repos_config)
|
|
results = [None] * total
|
|
done = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=4) as executor:
|
|
futures = {}
|
|
for i, repo in enumerate(self.repos_config):
|
|
self._log_gui(f"[{repo['name']}] Verification...", "dim")
|
|
futures[executor.submit(check_repo, repo)] = i
|
|
for future in as_completed(futures):
|
|
idx = futures[future]
|
|
res = future.result()
|
|
results[idx] = res
|
|
done += 1
|
|
# Résumé concis selon le statut
|
|
if res.get("offline"):
|
|
self._log_gui(f"[{res['name']}] Hors ligne", "warning")
|
|
elif res.get("error"):
|
|
self._log_gui(f"[{res['name']}] Erreur : {res['error']}", "error")
|
|
elif res.get("needs_clone"):
|
|
self._log_gui(f"[{res['name']}] A cloner", "warning")
|
|
elif res["up_to_date"]:
|
|
self._log_gui(f"[{res['name']}] A jour", "success")
|
|
else:
|
|
count = len(res["commits"]) + len(res["local_changes"])
|
|
self._log_gui(f"[{res['name']}] {count} changement(s) disponible(s)", "warning")
|
|
self.after(0, lambda d=done, t=total: self.status_label.configure(text=f"Verification {d}/{t}..."))
|
|
|
|
self.repo_results = results
|
|
self.after(0, self._display_results)
|
|
|
|
def _display_results(self):
|
|
"""Affiche les résultats dans la GUI."""
|
|
self._clear_cards()
|
|
|
|
has_any_updates = False
|
|
for res in self.repo_results:
|
|
card = self._create_card(res)
|
|
card.pack(fill="x", pady=5)
|
|
if not res["up_to_date"] and not res.get("error") and not res.get("offline") and not res.get("needs_clone"):
|
|
has_any_updates = True
|
|
|
|
total = len(self.repo_results)
|
|
up = sum(1 for r in self.repo_results if r["up_to_date"])
|
|
log.info(f"Resultat: {up}/{total} depots a jour")
|
|
self._log_gui(f"Verification terminee : {up}/{total} depots a jour", "success" if up == total else "warning")
|
|
self.status_label.configure(text=f"{up}/{total} depots a jour")
|
|
self.btn_refresh.state(["!disabled"])
|
|
|
|
if has_any_updates:
|
|
self.btn_update_all.state(["!disabled"])
|
|
|
|
def _clear_cards(self):
|
|
for w in self.scroll_frame.winfo_children():
|
|
w.destroy()
|
|
|
|
def _create_card(self, res):
|
|
"""Crée une carte pour un dépôt."""
|
|
bg_card = "#313244"
|
|
fg = "#cdd6f4"
|
|
green = "#a6e3a1"
|
|
yellow = "#f9e2af"
|
|
red = "#f38ba8"
|
|
cyan = "#89dceb"
|
|
dim = "#6c7086"
|
|
|
|
card = tk.Frame(self.scroll_frame, bg=bg_card, bd=0, highlightthickness=1, highlightbackground="#45475a")
|
|
|
|
# Header de la carte
|
|
top = tk.Frame(card, bg=bg_card)
|
|
top.pack(fill="x", padx=12, pady=(10, 5))
|
|
|
|
tk.Label(top, text=res["name"], bg=bg_card, fg=fg, font=("Segoe UI", 11, "bold")).pack(side="left")
|
|
|
|
if res.get("offline"):
|
|
tk.Label(top, text="HORS LIGNE", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right")
|
|
elif res.get("error"):
|
|
tk.Label(top, text="ERREUR", bg=bg_card, fg=red, font=("Segoe UI", 9, "bold")).pack(side="right")
|
|
elif res.get("needs_clone"):
|
|
tk.Label(top, text="A CLONER", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right")
|
|
elif res["up_to_date"]:
|
|
tk.Label(top, text="A JOUR", bg=bg_card, fg=green, font=("Segoe UI", 9, "bold")).pack(side="right")
|
|
else:
|
|
count = len(res["commits"]) + len(res["local_changes"])
|
|
tk.Label(top, text=f"MAJ DISPO ({count})", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right")
|
|
|
|
# Infos
|
|
info = tk.Frame(card, bg=bg_card)
|
|
info.pack(fill="x", padx=12)
|
|
tk.Label(info, text=f"Chemin : {res['relative_path']}", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w")
|
|
if res["branch"]:
|
|
tk.Label(info, text=f"Branche : {res['branch']} | {res['local_hash']}", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w")
|
|
|
|
# Erreur
|
|
if res.get("error"):
|
|
tk.Label(card, text=f" {res['error']}", bg=bg_card, fg=red, font=("Segoe UI", 9), anchor="w").pack(fill="x", padx=12, pady=(5, 0))
|
|
|
|
# Commits distants
|
|
if res["commits"]:
|
|
tk.Label(card, text=f"Nouveaux commits ({len(res['commits'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(8, 2))
|
|
for c in res["commits"][:5]:
|
|
line_frame = tk.Frame(card, bg=bg_card)
|
|
line_frame.pack(fill="x", padx=20)
|
|
tk.Label(line_frame, text=c["hash"], bg=bg_card, fg=cyan, font=("Consolas", 9)).pack(side="left")
|
|
tk.Label(line_frame, text=f" {c['message']}", bg=bg_card, fg=fg, font=("Segoe UI", 9), anchor="w").pack(side="left", fill="x")
|
|
if len(res["commits"]) > 5:
|
|
tk.Label(card, text=f" ... et {len(res['commits']) - 5} autres commits", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20)
|
|
|
|
# Fichiers distants modifiés
|
|
if res["files"]:
|
|
tk.Label(card, text=f"Fichiers distants modifies ({len(res['files'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(5, 2))
|
|
color_map = {"A": green, "M": yellow, "D": red, "R": cyan}
|
|
for f in res["files"][:8]:
|
|
line_frame = tk.Frame(card, bg=bg_card)
|
|
line_frame.pack(fill="x", padx=20)
|
|
c = color_map.get(f["status_char"], fg)
|
|
tk.Label(line_frame, text=f"[{f['status']:>9}]", bg=bg_card, fg=c, font=("Consolas", 9)).pack(side="left")
|
|
tk.Label(line_frame, text=f" {f['file']}", bg=bg_card, fg=fg, font=("Segoe UI", 9)).pack(side="left")
|
|
if len(res["files"]) > 8:
|
|
tk.Label(card, text=f" ... et {len(res['files']) - 8} autres fichiers", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20)
|
|
|
|
# Fichiers locaux manquants
|
|
if res["local_changes"]:
|
|
tk.Label(card, text=f"Fichiers locaux a restaurer ({len(res['local_changes'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(5, 2))
|
|
for f in res["local_changes"][:8]:
|
|
line_frame = tk.Frame(card, bg=bg_card)
|
|
line_frame.pack(fill="x", padx=20)
|
|
c = red if f["status_char"] == "D" else yellow
|
|
tk.Label(line_frame, text=f"[{f['status']:>20}]", bg=bg_card, fg=c, font=("Consolas", 8)).pack(side="left")
|
|
tk.Label(line_frame, text=f" {f['file']}", bg=bg_card, fg=fg, font=("Segoe UI", 9)).pack(side="left")
|
|
if len(res["local_changes"]) > 8:
|
|
tk.Label(card, text=f" ... et {len(res['local_changes']) - 8} autres fichiers", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20)
|
|
|
|
# Bouton d'action
|
|
if not res["up_to_date"] and not res.get("error"):
|
|
btn_frame = tk.Frame(card, bg=bg_card)
|
|
btn_frame.pack(fill="x", padx=12, pady=(8, 0))
|
|
|
|
if res.get("needs_clone"):
|
|
btn = ttk.Button(btn_frame, text="Cloner le depot", command=lambda r=res: self._do_clone(r))
|
|
else:
|
|
btn = ttk.Button(btn_frame, text="Mettre a jour", command=lambda r=res: self._do_update(r))
|
|
btn.pack(side="left")
|
|
# Stocker ref du bouton pour le désactiver après
|
|
res["_btn"] = btn
|
|
|
|
# Padding bas
|
|
tk.Frame(card, bg=bg_card, height=10).pack()
|
|
|
|
return card
|
|
|
|
def _do_update(self, res, batch=False):
|
|
"""Met à jour un dépôt (pull + restore)."""
|
|
name = res["name"]
|
|
log.info(f"[{name}] MAJ unitaire demandee")
|
|
self._log_gui(f"[{name}] Mise a jour en cours...", "info")
|
|
if "_btn" in res:
|
|
res["_btn"].state(["disabled"])
|
|
|
|
def work():
|
|
messages = []
|
|
success = True
|
|
local_path = res["local_path"]
|
|
branch = res["branch"]
|
|
|
|
tag_map = {"A": "file_add", "M": "file_mod", "D": "file_del", "R": "file_mod"}
|
|
|
|
if res["commits"]:
|
|
log.info(f"[{name}] Pull de {len(res['commits'])} commit(s)...")
|
|
self._log_gui(f"[{name}] Telechargement de {len(res['commits'])} commit(s)...", "info")
|
|
ok, out, err = do_pull(local_path, branch)
|
|
if ok:
|
|
msg = f"Pull OK : {len(res['commits'])} commits telecharges."
|
|
log.info(f"[{name}] {msg}")
|
|
self._log_gui(f"[{name}] {msg}", "success")
|
|
# Logger chaque fichier distant
|
|
for f in res.get("files", []):
|
|
tag = tag_map.get(f["status_char"], "info")
|
|
self._log_gui(f" [{f['status']:>9}] {f['file']}", tag)
|
|
messages.append(msg)
|
|
else:
|
|
msg = f"Erreur pull : {err}"
|
|
log.error(f"[{name}] {msg}")
|
|
self._log_gui(f"[{name}] {msg}", "error")
|
|
messages.append(msg)
|
|
success = False
|
|
|
|
if res["local_changes"]:
|
|
log.info(f"[{name}] Restauration de {len(res['local_changes'])} fichier(s)...")
|
|
self._log_gui(f"[{name}] Restauration de {len(res['local_changes'])} fichier(s)...", "info")
|
|
ok, err = do_restore(local_path)
|
|
if ok:
|
|
msg = f"Restauration OK : {len(res['local_changes'])} fichiers restaures."
|
|
log.info(f"[{name}] {msg}")
|
|
self._log_gui(f"[{name}] {msg}", "success")
|
|
# Logger chaque fichier restauré
|
|
for f in res["local_changes"]:
|
|
tag = tag_map.get(f["status_char"], "info")
|
|
self._log_gui(f" [Restaure] {f['file']}", tag)
|
|
messages.append(msg)
|
|
else:
|
|
msg = f"Erreur restauration : {err}"
|
|
log.error(f"[{name}] {msg}")
|
|
self._log_gui(f"[{name}] {msg}", "error")
|
|
messages.append(msg)
|
|
success = False
|
|
|
|
status = "SUCCES" if success else "ECHEC"
|
|
log.info(f"[{name}] MAJ unitaire terminee - {status}")
|
|
self._log_gui(f"[{name}] Termine - {status}", "success" if success else "error")
|
|
self.after(0, lambda: self._show_update_result(res, messages, success, batch=batch))
|
|
|
|
threading.Thread(target=work, daemon=True).start()
|
|
|
|
def _do_clone(self, res):
|
|
"""Clone un dépôt."""
|
|
name = res["name"]
|
|
log.info(f"[{name}] Clonage demande - {res['url']}")
|
|
self._log_gui(f"[{name}] Clonage en cours depuis {res['url']}...", "info")
|
|
if "_btn" in res:
|
|
res["_btn"].state(["disabled"])
|
|
|
|
repo = {"url": res["url"], "path": res["relative_path"]}
|
|
|
|
def work():
|
|
ok, err = do_clone(repo)
|
|
if ok:
|
|
msg = f"Depot '{name}' clone avec succes !"
|
|
log.info(f"[{name}] {msg}")
|
|
self._log_gui(f"[{name}] {msg}", "success")
|
|
else:
|
|
msg = f"Erreur de clonage : {err}"
|
|
log.error(f"[{name}] {msg}")
|
|
self._log_gui(f"[{name}] {msg}", "error")
|
|
self.after(0, lambda: messagebox.showinfo("Clonage", msg))
|
|
self.after(100, self._start_check)
|
|
|
|
threading.Thread(target=work, daemon=True).start()
|
|
|
|
def _show_update_result(self, res, messages, success, batch=False):
|
|
if batch:
|
|
# En mode batch : pas de messagebox individuelle, on ne rafraichit qu'une fois à la fin
|
|
self._batch_remaining -= 1
|
|
if self._batch_remaining == 0:
|
|
self._start_check()
|
|
else:
|
|
title = "Mise a jour" if success else "Erreur"
|
|
messagebox.showinfo(title, f"{res['name']}\n\n" + "\n".join(messages))
|
|
self._start_check()
|
|
|
|
def _update_all(self):
|
|
"""Met à jour tous les dépôts qui ont des MAJ."""
|
|
to_update = [r for r in self.repo_results if not r["up_to_date"] and not r.get("error") and not r.get("needs_clone")]
|
|
if not to_update:
|
|
return
|
|
log.info(f"MAJ globale demandee - {len(to_update)} depot(s) a mettre a jour")
|
|
self._log_gui(f"MAJ globale : {len(to_update)} depot(s) a mettre a jour", "warning")
|
|
# Compteur pour n'appeler _start_check qu'une seule fois quand tous sont termines
|
|
self._batch_remaining = len(to_update)
|
|
for res in to_update:
|
|
self._do_update(res, batch=True)
|
|
|
|
def _show_no_repos(self):
|
|
bg_card = "#313244"
|
|
fg = "#cdd6f4"
|
|
dim = "#6c7086"
|
|
|
|
card = tk.Frame(self.scroll_frame, bg=bg_card, bd=0, highlightthickness=1, highlightbackground="#45475a")
|
|
card.pack(fill="x", pady=5)
|
|
|
|
tk.Label(card, text="Aucun depot configure", bg=bg_card, fg=fg, font=("Segoe UI", 11, "bold")).pack(padx=12, pady=(12, 5))
|
|
tk.Label(card, text="Edite config.ini pour ajouter des depots :", bg=bg_card, fg=dim, font=("Segoe UI", 9)).pack(padx=12)
|
|
example = (
|
|
"[repo:MonRepo]\n"
|
|
"url = http://192.168.1.235:3125/user/repo\n"
|
|
"path = ../MonRepo"
|
|
)
|
|
tk.Label(card, text=example, bg="#1e1e2e", fg="#a6e3a1", font=("Consolas", 9), justify="left", padx=10, pady=8).pack(padx=12, pady=8, fill="x")
|
|
tk.Frame(card, bg=bg_card, height=10).pack()
|
|
|
|
def _open_config(self):
|
|
config_path = str(get_config_path())
|
|
if sys.platform == "win32":
|
|
os.startfile(config_path)
|
|
|
|
def _open_log(self):
|
|
log_dir = str(get_exe_dir() / "log")
|
|
if sys.platform == "win32":
|
|
os.startfile(log_dir)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = App()
|
|
app.mainloop()
|