Files
Lanceur-Geco/git_updater.py
2026-03-24 15:57:44 +01:00

910 lines
36 KiB
Python

"""
Git Update Checker - GUI multi-repo.
Vérifie les mises à jour de plusieurs dépôts Git et propose de les télécharger.
Accès lecture seule uniquement (fetch/pull/checkout, jamais de push).
Tous les chemins sont relatifs à l'emplacement de l'exécutable.
"""
VERSION = "0.4"
import subprocess
import sys
import os
import configparser
import logging
import threading
import tkinter as tk
from tkinter import ttk, messagebox
from pathlib import Path
from datetime import datetime
import urllib.request
import urllib.error
# Forcer UTF-8 sur Windows
if sys.platform == "win32":
os.system("chcp 65001 >nul 2>&1")
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
# ── Utilitaires ──────────────────────────────────────────────────────────────
def get_exe_dir():
if getattr(sys, "frozen", False):
return Path(sys.executable).parent
return Path(__file__).parent
def resolve_relative(path_str):
"""Résout un chemin relatif par rapport au dossier de l'exe."""
p = Path(path_str)
if not p.is_absolute():
p = get_exe_dir() / p
return p.resolve()
# ── Logging ──────────────────────────────────────────────────────────────────
def setup_logging():
"""Configure le logging dans un dossier log/ à côté de l'exe."""
log_dir = get_exe_dir() / "log"
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"{datetime.now().strftime('%Y-%m-%d')}.log"
logger = logging.getLogger("GitUpdateChecker")
logger.setLevel(logging.DEBUG)
# Handler fichier
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S"))
logger.addHandler(fh)
# Nettoyage des vieux logs (garder 30 jours)
for old_log in sorted(log_dir.glob("*.log"))[:-30]:
try:
old_log.unlink()
except OSError:
pass
return logger
log = setup_logging()
def run_git(args, cwd=None):
# -c safe.directory=* : évite l'erreur "dubious ownership" sur clé USB
cmd = ["git", "-c", "safe.directory=*"] + args
log.debug(f"git {' '.join(args)} (cwd={cwd})")
try:
result = subprocess.run(
cmd, cwd=cwd, capture_output=True, text=True, timeout=30,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0,
)
if result.returncode != 0 and result.stderr.strip():
log.warning(f"git {args[0]} erreur: {result.stderr.strip()}")
return result.returncode, result.stdout.strip(), result.stderr.strip()
except FileNotFoundError:
log.error("Git non trouve dans le PATH")
return -1, "", "Git n'est pas installe ou pas dans le PATH."
except subprocess.TimeoutExpired:
log.error(f"Timeout: git {' '.join(args)}")
return 1, "", "Timeout"
# ── Auto-update du programme ─────────────────────────────────────────────────
def _version_tuple(v):
"""Convertit '0.4' en (0, 4) pour comparaison."""
try:
return tuple(int(x) for x in v.strip().split("."))
except (ValueError, AttributeError):
return (0,)
def _get_self_update_config():
"""Lit la config [self-update] depuis config.ini. Retourne (url, exe_name) ou (None, None)."""
config_path = get_config_path()
if not config_path.exists():
return None, None
config = configparser.ConfigParser()
config.read(config_path, encoding="utf-8")
if not config.has_section("self-update"):
return None, None
url = config.get("self-update", "url", fallback="").strip().rstrip("/")
exe_name = config.get("self-update", "exe_name", fallback="GitUpdateChecker.exe").strip()
if not url:
return None, None
return url, exe_name
def check_self_update():
"""
Vérifie si une nouvelle version est disponible sur le serveur Gitea.
Télécharge version.txt via HTTP et compare avec VERSION locale.
Retourne (needs_update: bool, info: str).
"""
repo_url, _ = _get_self_update_config()
if not repo_url:
log.info("Auto-update: pas de section [self-update] dans config.ini, skip")
return False, ""
log.info("Auto-update: verification via HTTP...")
version_url = f"{repo_url}/raw/branch/master/version.txt"
try:
req = urllib.request.Request(version_url, headers={"User-Agent": "GitUpdateChecker"})
with urllib.request.urlopen(req, timeout=10) as resp:
remote_version = resp.read().decode("utf-8").strip()
except (urllib.error.URLError, OSError) as e:
log.warning(f"Auto-update: impossible de verifier la version distante: {e}")
return False, f"Impossible de contacter le serveur: {e}"
log.info(f"Auto-update: version locale={VERSION} distante={remote_version}")
if _version_tuple(remote_version) <= _version_tuple(VERSION):
log.info("Auto-update: programme a jour")
return False, ""
info = f"Version actuelle : {VERSION}\nVersion disponible : {remote_version}"
log.info(f"Auto-update: MAJ disponible - {remote_version}")
return True, info
def do_self_update():
"""
Télécharge le nouvel exe depuis le serveur Gitea.
Stratégie : télécharger dans .new, renommer l'exe actuel en .old, placer le nouveau.
Retourne (ok, message).
"""
repo_url, exe_name = _get_self_update_config()
if not repo_url:
return False, "Configuration [self-update] manquante"
is_frozen = getattr(sys, "frozen", False)
if not is_frozen:
log.warning("Auto-update: mode script, telechargement non supporte")
return False, "Auto-update uniquement supporte en mode .exe"
exe_path = Path(sys.executable)
exe_old_path = exe_path.with_suffix(".exe.old")
exe_new_path = exe_path.with_suffix(".exe.new")
# Telecharger le nouvel exe
download_url = f"{repo_url}/raw/branch/master/{exe_name}"
log.info(f"Auto-update: telechargement de {download_url}")
try:
req = urllib.request.Request(download_url, headers={"User-Agent": "GitUpdateChecker"})
with urllib.request.urlopen(req, timeout=60) as resp:
data = resp.read()
if len(data) < 1000:
log.error(f"Auto-update: fichier telecharge trop petit ({len(data)} octets)")
return False, "Le fichier telecharge semble invalide"
with open(exe_new_path, "wb") as f:
f.write(data)
log.info(f"Auto-update: telecharge {len(data)} octets -> {exe_new_path.name}")
except (urllib.error.URLError, OSError) as e:
log.error(f"Auto-update: echec telechargement: {e}")
if exe_new_path.exists():
try:
exe_new_path.unlink()
except OSError:
pass
return False, f"Erreur telechargement: {e}"
# Renommer : exe actuel -> .old
try:
if exe_old_path.exists():
exe_old_path.unlink()
exe_path.rename(exe_old_path)
log.info(f"Auto-update: {exe_path.name} -> {exe_old_path.name}")
except OSError as e:
log.error(f"Auto-update: impossible de renommer l'exe: {e}")
if exe_new_path.exists():
try:
exe_new_path.unlink()
except OSError:
pass
return False, f"Impossible de renommer l'exe: {e}"
# Renommer : .new -> exe
try:
exe_new_path.rename(exe_path)
log.info(f"Auto-update: {exe_new_path.name} -> {exe_path.name}")
except OSError as e:
log.error(f"Auto-update: impossible de placer le nouvel exe: {e}")
try:
exe_old_path.rename(exe_path)
log.info("Auto-update: ancien exe restaure")
except OSError:
pass
return False, f"Impossible de placer le nouvel exe: {e}"
return True, "Mise a jour reussie !\nLe programme va redemarrer."
def relaunch_program():
"""Relance le programme (nouvel exe) et quitte le processus actuel."""
if getattr(sys, "frozen", False):
exe_path = str(Path(sys.executable))
log.info(f"Auto-update: relance de {exe_path}")
# Lancer un batch qui attend 1s puis lance le nouvel exe et supprime l'ancien .old
bat_path = str(get_exe_dir() / "_update.bat")
bat_content = (
f'@echo off\n'
f'timeout /t 1 /nobreak >nul\n'
f'start "" "{exe_path}"\n'
f'del "{exe_path}.old" 2>nul\n'
f'del "%~f0"\n'
)
with open(bat_path, "w") as f:
f.write(bat_content)
subprocess.Popen(
["cmd", "/c", bat_path],
creationflags=subprocess.CREATE_NO_WINDOW,
)
else:
# Mode script : relancer python
log.info("Auto-update: relance du script")
subprocess.Popen([sys.executable, __file__])
# ── Configuration ────────────────────────────────────────────────────────────
def get_config_path():
return get_exe_dir() / "config.ini"
def load_repos():
"""Charge la liste des dépôts depuis config.ini."""
config_path = get_config_path()
config = configparser.ConfigParser()
repos = []
if not config_path.exists():
# Créer un config.ini exemple
config["repo:Exemple"] = {
"url": "http://192.168.1.235:3125/user/repo",
"path": "../MonRepo",
}
with open(config_path, "w", encoding="utf-8") as f:
f.write("; Configuration des depots Git a surveiller\n")
f.write("; Les chemins (path) sont relatifs a l'emplacement de l'exe\n")
f.write("; Ajouter autant de sections [repo:NomDuRepo] que necessaire\n\n")
config.write(f)
return repos
config.read(config_path, encoding="utf-8")
for section in config.sections():
if section.startswith("repo:"):
name = section[5:]
url = config.get(section, "url", fallback="").strip()
path = config.get(section, "path", fallback="").strip()
if url and path:
repos.append({"name": name, "url": url, "path": path})
return repos
# ── Logique Git ──────────────────────────────────────────────────────────────
def check_repo(repo):
"""Vérifie un dépôt et retourne son état."""
name = repo["name"]
url = repo["url"]
local_path = str(resolve_relative(repo["path"]))
log.info(f"[{name}] Verification - {url} -> {local_path}")
result = {
"name": name,
"url": url,
"local_path": local_path,
"relative_path": repo["path"],
"exists": False,
"up_to_date": False,
"error": None,
"commits": [],
"files": [],
"local_changes": [],
"local_only": False,
"branch": "",
"local_hash": "",
"remote_hash": "",
"needs_clone": False,
}
# Vérifier si le dossier existe et est un repo git
if not os.path.isdir(os.path.join(local_path, ".git")):
if os.path.isdir(local_path) and os.listdir(local_path):
result["error"] = "Le dossier existe mais n'est pas un depot git."
return result
result["needs_clone"] = True
return result
result["exists"] = True
# Branche courante
code, branch, _ = run_git(["rev-parse", "--abbrev-ref", "HEAD"], cwd=local_path)
if code != 0:
result["error"] = "Impossible de lire la branche courante."
return result
result["branch"] = branch
# Fetch
code, _, err = run_git(["fetch", "origin"], cwd=local_path)
if code != 0:
result["error"] = f"Erreur fetch : {err}"
return result
# Comparer HEAD local vs origin
code, local_hash, _ = run_git(["rev-parse", "HEAD"], cwd=local_path)
if code != 0:
result["error"] = "Impossible de lire le commit local."
return result
code, remote_hash, _ = run_git(["rev-parse", f"origin/{branch}"], cwd=local_path)
if code != 0:
result["error"] = f"Impossible de trouver origin/{branch}."
return result
result["local_hash"] = local_hash[:8]
result["remote_hash"] = remote_hash[:8]
# Modifications locales
code, local_diff, _ = run_git(["diff", "--name-status", "HEAD"], cwd=local_path)
if code == 0 and local_diff:
for line in local_diff.splitlines():
parts = line.split("\t", 1)
if len(parts) == 2:
sc = parts[0].strip()
fn = parts[1].strip()
status_map = {"D": "Supprime localement", "M": "Modifie localement"}
result["local_changes"].append({
"status": status_map.get(sc[0], sc),
"status_char": sc[0],
"file": fn,
})
if local_hash == remote_hash and not result["local_changes"]:
log.info(f"[{name}] A jour ({local_hash[:8]})")
result["up_to_date"] = True
return result
if local_hash == remote_hash:
log.info(f"[{name}] {len(result['local_changes'])} fichier(s) locaux modifies/supprimes")
result["local_only"] = True
return result
# Nouveaux commits distants
code, log_out, _ = run_git(
["log", "--oneline", "--format=%h|%an|%ar|%s", f"HEAD..origin/{branch}"],
cwd=local_path,
)
if code == 0 and log_out:
for line in log_out.splitlines():
parts = line.split("|", 3)
if len(parts) == 4:
result["commits"].append({
"hash": parts[0], "author": parts[1],
"date": parts[2], "message": parts[3],
})
# Fichiers distants modifiés
code, diff_out, _ = run_git(
["diff", "--name-status", f"HEAD..origin/{branch}"],
cwd=local_path,
)
if code == 0 and diff_out:
for line in diff_out.splitlines():
parts = line.split("\t", 1)
if len(parts) == 2:
sc = parts[0].strip()
fn = parts[1].strip()
status_map = {"A": "Ajoute", "M": "Modifie", "D": "Supprime", "R": "Renomme"}
result["files"].append({
"status": status_map.get(sc[0], sc),
"status_char": sc[0],
"file": fn,
})
return result
def do_clone(repo):
"""Clone un dépôt."""
local_path = str(resolve_relative(repo["path"]))
log.info(f"Clonage: {repo['url']} -> {local_path}")
code, _, err = run_git(["clone", repo["url"], local_path])
if code == 0:
log.info(f"Clonage reussi: {local_path}")
else:
log.error(f"Clonage echoue: {err}")
return code == 0, err
def do_pull(local_path, branch):
"""Pull les mises à jour (lecture seule)."""
log.info(f"Pull: {local_path} (branche {branch})")
code, out, err = run_git(["pull", "origin", branch], cwd=local_path)
if code == 0:
log.info(f"Pull reussi: {local_path}")
else:
log.error(f"Pull echoue: {err}")
return code == 0, out, err
def do_restore(local_path):
"""Restaure les fichiers locaux modifiés/supprimés."""
log.info(f"Restauration: {local_path}")
code, _, err = run_git(["checkout", "--", "."], cwd=local_path)
if code == 0:
log.info(f"Restauration reussie: {local_path}")
else:
log.error(f"Restauration echouee: {err}")
return code == 0, err
# ── Interface graphique ──────────────────────────────────────────────────────
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title(f"Git Update Checker v{VERSION}")
self.geometry("820x600")
self.minsize(700, 450)
self.configure(bg="#1e1e2e")
self.repos_config = load_repos()
self.repo_results = []
log.info(f"=== Demarrage Git Update Checker v{VERSION} ===")
self._build_ui()
self.after(100, self._check_self_update_then_repos)
def _build_ui(self):
style = ttk.Style(self)
style.theme_use("clam")
# Couleurs sombres
bg = "#1e1e2e"
fg = "#cdd6f4"
accent = "#89b4fa"
green = "#a6e3a1"
yellow = "#f9e2af"
red = "#f38ba8"
style.configure("TFrame", background=bg)
style.configure("TLabel", background=bg, foreground=fg, font=("Segoe UI", 10))
style.configure("Title.TLabel", background=bg, foreground=fg, font=("Segoe UI", 14, "bold"))
style.configure("Status.TLabel", background=bg, foreground=accent, font=("Segoe UI", 9))
style.configure("TButton", font=("Segoe UI", 10))
style.configure("Green.TLabel", background=bg, foreground=green, font=("Segoe UI", 10, "bold"))
style.configure("Yellow.TLabel", background=bg, foreground=yellow, font=("Segoe UI", 10, "bold"))
style.configure("Red.TLabel", background=bg, foreground=red, font=("Segoe UI", 10, "bold"))
# Header
header = ttk.Frame(self)
header.pack(fill="x", padx=15, pady=(15, 5))
ttk.Label(header, text=f"Git Update Checker v{VERSION}", style="Title.TLabel").pack(side="left")
self.status_label = ttk.Label(header, text="Verification en cours...", style="Status.TLabel")
self.status_label.pack(side="right")
# Date
date_label = ttk.Label(self, text=datetime.now().strftime(" %d/%m/%Y %H:%M:%S"), style="Status.TLabel")
date_label.pack(anchor="w", padx=15)
# Panneau principal (PanedWindow vertical : cartes en haut, log en bas)
paned = ttk.PanedWindow(self, orient="vertical")
paned.pack(fill="both", expand=True, padx=15, pady=10)
# Zone scrollable pour les repos
container = ttk.Frame(paned)
paned.add(container, weight=3)
self.canvas = tk.Canvas(container, bg=bg, highlightthickness=0)
scrollbar = ttk.Scrollbar(container, orient="vertical", command=self.canvas.yview)
self.scroll_frame = ttk.Frame(self.canvas)
self.scroll_frame.bind("<Configure>", lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all")))
self.canvas.create_window((0, 0), window=self.scroll_frame, anchor="nw")
self.canvas.configure(yscrollcommand=scrollbar.set)
self.canvas.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
# Scroll avec la molette
self.canvas.bind_all("<MouseWheel>", lambda e: self.canvas.yview_scroll(int(-1 * (e.delta / 120)), "units"))
# Panneau de log en bas
log_frame = tk.Frame(paned, bg="#181825")
paned.add(log_frame, weight=1)
log_header = tk.Frame(log_frame, bg="#181825")
log_header.pack(fill="x", padx=8, pady=(6, 2))
tk.Label(log_header, text="Journal des operations", bg="#181825", fg="#a6adc8", font=("Segoe UI", 9, "bold")).pack(side="left")
tk.Button(log_header, text="Effacer", bg="#313244", fg="#cdd6f4", bd=0, font=("Segoe UI", 8),
command=self._clear_log_panel, activebackground="#45475a", activeforeground="#cdd6f4").pack(side="right")
self.log_text = tk.Text(log_frame, bg="#11111b", fg="#cdd6f4", font=("Consolas", 9),
height=8, bd=0, highlightthickness=0, wrap="word",
state="disabled", padx=8, pady=4)
log_scroll = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview)
self.log_text.configure(yscrollcommand=log_scroll.set)
self.log_text.pack(side="left", fill="both", expand=True, padx=(8, 0), pady=(0, 8))
log_scroll.pack(side="right", fill="y", padx=(0, 8), pady=(0, 8))
# Tags couleur pour le log GUI
self.log_text.tag_configure("info", foreground="#cdd6f4")
self.log_text.tag_configure("success", foreground="#a6e3a1")
self.log_text.tag_configure("warning", foreground="#f9e2af")
self.log_text.tag_configure("error", foreground="#f38ba8")
self.log_text.tag_configure("file_add", foreground="#a6e3a1")
self.log_text.tag_configure("file_mod", foreground="#f9e2af")
self.log_text.tag_configure("file_del", foreground="#f38ba8")
self.log_text.tag_configure("dim", foreground="#6c7086")
# Boutons en bas
btn_frame = ttk.Frame(self)
btn_frame.pack(fill="x", padx=15, pady=(0, 15))
self.btn_refresh = ttk.Button(btn_frame, text="Rafraichir", command=self._start_check)
self.btn_refresh.pack(side="left", padx=(0, 10))
self.btn_update_all = ttk.Button(btn_frame, text="Tout mettre a jour", command=self._update_all)
self.btn_update_all.pack(side="left")
self.btn_update_all.state(["disabled"])
ttk.Button(btn_frame, text="Ouvrir config.ini", command=self._open_config).pack(side="right")
self.btn_open_log = ttk.Button(btn_frame, text="Ouvrir les logs", command=self._open_log)
self.btn_open_log.pack(side="right", padx=(0, 10))
def _check_self_update_then_repos(self):
"""Vérifie d'abord la MAJ du programme, puis les repos."""
self.status_label.configure(text="Verification auto-update...")
def work():
needs, info = check_self_update()
self.after(0, lambda: self._handle_self_update(needs, info))
threading.Thread(target=work, daemon=True).start()
def _log_gui(self, message, tag="info"):
"""Ajoute une ligne dans le panneau de log."""
def _append():
self.log_text.configure(state="normal")
timestamp = datetime.now().strftime("%H:%M:%S")
self.log_text.insert("end", f"[{timestamp}] ", "dim")
self.log_text.insert("end", f"{message}\n", tag)
self.log_text.see("end")
self.log_text.configure(state="disabled")
# Appel thread-safe
if threading.current_thread() is threading.main_thread():
_append()
else:
self.after(0, _append)
def _clear_log_panel(self):
self.log_text.configure(state="normal")
self.log_text.delete("1.0", "end")
self.log_text.configure(state="disabled")
def _handle_self_update(self, needs_update, info):
"""Gère le résultat de l'auto-update."""
if needs_update:
answer = messagebox.askyesno(
"Mise a jour du programme",
f"Une mise a jour du programme est disponible !\n\n{info}\n\nMettre a jour maintenant ?",
)
if answer:
self.status_label.configure(text="Mise a jour du programme...")
def work():
ok, msg = do_self_update()
self.after(0, lambda: self._self_update_done(ok, msg))
threading.Thread(target=work, daemon=True).start()
return
self._start_check()
def _self_update_done(self, ok, msg):
"""Callback après auto-update."""
if ok:
messagebox.showinfo("Auto-update", msg)
log.info("Auto-update appliquee, relance...")
relaunch_program()
self.destroy()
return
else:
messagebox.showwarning("Auto-update", msg)
self._start_check()
def _start_check(self):
"""Lance la vérification dans un thread."""
self.btn_refresh.state(["disabled"])
self.btn_update_all.state(["disabled"])
self.status_label.configure(text="Verification en cours...")
self._log_gui("Verification des depots...", "info")
self.repos_config = load_repos()
if not self.repos_config:
self._clear_cards()
self.status_label.configure(text="Aucun depot configure")
self._show_no_repos()
self.btn_refresh.state(["!disabled"])
return
threading.Thread(target=self._check_all, daemon=True).start()
def _check_all(self):
"""Vérifie tous les repos (dans un thread)."""
results = []
for repo in self.repos_config:
results.append(check_repo(repo))
self.repo_results = results
self.after(0, self._display_results)
def _display_results(self):
"""Affiche les résultats dans la GUI."""
self._clear_cards()
has_any_updates = False
for res in self.repo_results:
card = self._create_card(res)
card.pack(fill="x", pady=5)
if not res["up_to_date"] and not res.get("error"):
has_any_updates = True
total = len(self.repo_results)
up = sum(1 for r in self.repo_results if r["up_to_date"])
log.info(f"Resultat: {up}/{total} depots a jour")
self._log_gui(f"Verification terminee : {up}/{total} depots a jour", "success" if up == total else "warning")
self.status_label.configure(text=f"{up}/{total} depots a jour")
self.btn_refresh.state(["!disabled"])
if has_any_updates:
self.btn_update_all.state(["!disabled"])
def _clear_cards(self):
for w in self.scroll_frame.winfo_children():
w.destroy()
def _create_card(self, res):
"""Crée une carte pour un dépôt."""
bg_card = "#313244"
fg = "#cdd6f4"
green = "#a6e3a1"
yellow = "#f9e2af"
red = "#f38ba8"
cyan = "#89dceb"
dim = "#6c7086"
card = tk.Frame(self.scroll_frame, bg=bg_card, bd=0, highlightthickness=1, highlightbackground="#45475a")
# Header de la carte
top = tk.Frame(card, bg=bg_card)
top.pack(fill="x", padx=12, pady=(10, 5))
tk.Label(top, text=res["name"], bg=bg_card, fg=fg, font=("Segoe UI", 11, "bold")).pack(side="left")
if res.get("error"):
tk.Label(top, text="ERREUR", bg=bg_card, fg=red, font=("Segoe UI", 9, "bold")).pack(side="right")
elif res.get("needs_clone"):
tk.Label(top, text="A CLONER", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right")
elif res["up_to_date"]:
tk.Label(top, text="A JOUR", bg=bg_card, fg=green, font=("Segoe UI", 9, "bold")).pack(side="right")
else:
count = len(res["commits"]) + len(res["local_changes"])
tk.Label(top, text=f"MAJ DISPO ({count})", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right")
# Infos
info = tk.Frame(card, bg=bg_card)
info.pack(fill="x", padx=12)
tk.Label(info, text=f"Chemin : {res['relative_path']}", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w")
if res["branch"]:
tk.Label(info, text=f"Branche : {res['branch']} | {res['local_hash']}", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w")
# Erreur
if res.get("error"):
tk.Label(card, text=f" {res['error']}", bg=bg_card, fg=red, font=("Segoe UI", 9), anchor="w").pack(fill="x", padx=12, pady=(5, 0))
# Commits distants
if res["commits"]:
tk.Label(card, text=f"Nouveaux commits ({len(res['commits'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(8, 2))
for c in res["commits"][:5]:
line_frame = tk.Frame(card, bg=bg_card)
line_frame.pack(fill="x", padx=20)
tk.Label(line_frame, text=c["hash"], bg=bg_card, fg=cyan, font=("Consolas", 9)).pack(side="left")
tk.Label(line_frame, text=f" {c['message']}", bg=bg_card, fg=fg, font=("Segoe UI", 9), anchor="w").pack(side="left", fill="x")
if len(res["commits"]) > 5:
tk.Label(card, text=f" ... et {len(res['commits']) - 5} autres commits", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20)
# Fichiers distants modifiés
if res["files"]:
tk.Label(card, text=f"Fichiers distants modifies ({len(res['files'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(5, 2))
color_map = {"A": green, "M": yellow, "D": red, "R": cyan}
for f in res["files"][:8]:
line_frame = tk.Frame(card, bg=bg_card)
line_frame.pack(fill="x", padx=20)
c = color_map.get(f["status_char"], fg)
tk.Label(line_frame, text=f"[{f['status']:>9}]", bg=bg_card, fg=c, font=("Consolas", 9)).pack(side="left")
tk.Label(line_frame, text=f" {f['file']}", bg=bg_card, fg=fg, font=("Segoe UI", 9)).pack(side="left")
if len(res["files"]) > 8:
tk.Label(card, text=f" ... et {len(res['files']) - 8} autres fichiers", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20)
# Fichiers locaux manquants
if res["local_changes"]:
tk.Label(card, text=f"Fichiers locaux a restaurer ({len(res['local_changes'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(5, 2))
for f in res["local_changes"][:8]:
line_frame = tk.Frame(card, bg=bg_card)
line_frame.pack(fill="x", padx=20)
c = red if f["status_char"] == "D" else yellow
tk.Label(line_frame, text=f"[{f['status']:>20}]", bg=bg_card, fg=c, font=("Consolas", 8)).pack(side="left")
tk.Label(line_frame, text=f" {f['file']}", bg=bg_card, fg=fg, font=("Segoe UI", 9)).pack(side="left")
if len(res["local_changes"]) > 8:
tk.Label(card, text=f" ... et {len(res['local_changes']) - 8} autres fichiers", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20)
# Bouton d'action
if not res["up_to_date"] and not res.get("error"):
btn_frame = tk.Frame(card, bg=bg_card)
btn_frame.pack(fill="x", padx=12, pady=(8, 0))
if res.get("needs_clone"):
btn = ttk.Button(btn_frame, text="Cloner le depot", command=lambda r=res: self._do_clone(r))
else:
btn = ttk.Button(btn_frame, text="Mettre a jour", command=lambda r=res: self._do_update(r))
btn.pack(side="left")
# Stocker ref du bouton pour le désactiver après
res["_btn"] = btn
# Padding bas
tk.Frame(card, bg=bg_card, height=10).pack()
return card
def _do_update(self, res):
"""Met à jour un dépôt (pull + restore)."""
name = res["name"]
log.info(f"[{name}] MAJ unitaire demandee")
self._log_gui(f"[{name}] Mise a jour en cours...", "info")
if "_btn" in res:
res["_btn"].state(["disabled"])
def work():
messages = []
success = True
local_path = res["local_path"]
branch = res["branch"]
tag_map = {"A": "file_add", "M": "file_mod", "D": "file_del", "R": "file_mod"}
if res["commits"]:
log.info(f"[{name}] Pull de {len(res['commits'])} commit(s)...")
self._log_gui(f"[{name}] Telechargement de {len(res['commits'])} commit(s)...", "info")
ok, out, err = do_pull(local_path, branch)
if ok:
msg = f"Pull OK : {len(res['commits'])} commits telecharges."
log.info(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "success")
# Logger chaque fichier distant
for f in res.get("files", []):
tag = tag_map.get(f["status_char"], "info")
self._log_gui(f" [{f['status']:>9}] {f['file']}", tag)
messages.append(msg)
else:
msg = f"Erreur pull : {err}"
log.error(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "error")
messages.append(msg)
success = False
if res["local_changes"]:
log.info(f"[{name}] Restauration de {len(res['local_changes'])} fichier(s)...")
self._log_gui(f"[{name}] Restauration de {len(res['local_changes'])} fichier(s)...", "info")
ok, err = do_restore(local_path)
if ok:
msg = f"Restauration OK : {len(res['local_changes'])} fichiers restaures."
log.info(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "success")
# Logger chaque fichier restauré
for f in res["local_changes"]:
tag = tag_map.get(f["status_char"], "info")
self._log_gui(f" [Restaure] {f['file']}", tag)
messages.append(msg)
else:
msg = f"Erreur restauration : {err}"
log.error(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "error")
messages.append(msg)
success = False
status = "SUCCES" if success else "ECHEC"
log.info(f"[{name}] MAJ unitaire terminee - {status}")
self._log_gui(f"[{name}] Termine - {status}", "success" if success else "error")
self.after(0, lambda: self._show_update_result(res, messages, success))
threading.Thread(target=work, daemon=True).start()
def _do_clone(self, res):
"""Clone un dépôt."""
name = res["name"]
log.info(f"[{name}] Clonage demande - {res['url']}")
self._log_gui(f"[{name}] Clonage en cours depuis {res['url']}...", "info")
if "_btn" in res:
res["_btn"].state(["disabled"])
repo = {"url": res["url"], "path": res["relative_path"]}
def work():
ok, err = do_clone(repo)
if ok:
msg = f"Depot '{name}' clone avec succes !"
log.info(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "success")
else:
msg = f"Erreur de clonage : {err}"
log.error(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "error")
self.after(0, lambda: messagebox.showinfo("Clonage", msg))
self.after(100, self._start_check)
threading.Thread(target=work, daemon=True).start()
def _show_update_result(self, res, messages, success):
title = "Mise a jour" if success else "Erreur"
messagebox.showinfo(title, f"{res['name']}\n\n" + "\n".join(messages))
self._start_check()
def _update_all(self):
"""Met à jour tous les dépôts qui ont des MAJ."""
to_update = [r for r in self.repo_results if not r["up_to_date"] and not r.get("error") and not r.get("needs_clone")]
log.info(f"MAJ globale demandee - {len(to_update)} depot(s) a mettre a jour")
self._log_gui(f"MAJ globale : {len(to_update)} depot(s) a mettre a jour", "warning")
for res in to_update:
self._do_update(res)
def _show_no_repos(self):
bg_card = "#313244"
fg = "#cdd6f4"
dim = "#6c7086"
card = tk.Frame(self.scroll_frame, bg=bg_card, bd=0, highlightthickness=1, highlightbackground="#45475a")
card.pack(fill="x", pady=5)
tk.Label(card, text="Aucun depot configure", bg=bg_card, fg=fg, font=("Segoe UI", 11, "bold")).pack(padx=12, pady=(12, 5))
tk.Label(card, text="Edite config.ini pour ajouter des depots :", bg=bg_card, fg=dim, font=("Segoe UI", 9)).pack(padx=12)
example = (
"[repo:MonRepo]\n"
"url = http://192.168.1.235:3125/user/repo\n"
"path = ../MonRepo"
)
tk.Label(card, text=example, bg="#1e1e2e", fg="#a6e3a1", font=("Consolas", 9), justify="left", padx=10, pady=8).pack(padx=12, pady=8, fill="x")
tk.Frame(card, bg=bg_card, height=10).pack()
def _open_config(self):
config_path = str(get_config_path())
if sys.platform == "win32":
os.startfile(config_path)
def _open_log(self):
log_dir = str(get_exe_dir() / "log")
if sys.platform == "win32":
os.startfile(log_dir)
if __name__ == "__main__":
app = App()
app.mainloop()