Files
Lanceur-Geco/git_updater.py
2026-03-25 07:34:53 +01:00

1040 lines
42 KiB
Python

"""
Git Update Checker - GUI multi-repo.
Vérifie les mises à jour de plusieurs dépôts Git et propose de les télécharger.
Accès lecture seule uniquement (fetch/pull/checkout, jamais de push).
Tous les chemins sont relatifs à l'emplacement de l'exécutable.
"""
VERSION = "0.6.6"
import subprocess
import sys
import os
import configparser
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import ttk, messagebox
from pathlib import Path
from datetime import datetime
import urllib.request
import urllib.error
# Forcer UTF-8 sur Windows
if sys.platform == "win32":
os.system("chcp 65001 >nul 2>&1")
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
# ── Utilitaires ──────────────────────────────────────────────────────────────
def get_exe_dir():
if getattr(sys, "frozen", False):
return Path(sys.executable).parent
return Path(__file__).parent
def resolve_relative(path_str):
"""Résout un chemin relatif par rapport au dossier de l'exe."""
p = Path(path_str)
if not p.is_absolute():
p = get_exe_dir() / p
return p.resolve()
# ── Logging ──────────────────────────────────────────────────────────────────
def setup_logging():
"""Configure le logging dans un dossier log/ à côté de l'exe."""
log_dir = get_exe_dir() / "log"
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"{datetime.now().strftime('%Y-%m-%d')}.log"
logger = logging.getLogger("GitUpdateChecker")
logger.setLevel(logging.DEBUG)
# Handler fichier
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S"))
logger.addHandler(fh)
# Nettoyage des vieux logs (garder 30 jours)
for old_log in sorted(log_dir.glob("*.log"))[:-30]:
try:
old_log.unlink()
except OSError:
pass
return logger
log = setup_logging()
def run_git(args, cwd=None, timeout=30):
# -c safe.directory=* : évite l'erreur "dubious ownership" sur clé USB
cmd = ["git", "-c", "safe.directory=*"] + args
log.debug(f"git {' '.join(args)} (cwd={cwd})")
try:
result = subprocess.run(
cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0,
)
if result.returncode != 0 and result.stderr.strip():
log.warning(f"git {args[0]} erreur: {result.stderr.strip()}")
return result.returncode, result.stdout.strip(), result.stderr.strip()
except FileNotFoundError:
log.error("Git non trouve dans le PATH")
return -1, "", "Git n'est pas installe ou pas dans le PATH."
except subprocess.TimeoutExpired:
log.error(f"Timeout: git {' '.join(args)}")
return 1, "", "Timeout"
# ── Auto-update du programme ─────────────────────────────────────────────────
def _version_tuple(v):
"""Convertit '0.4' en (0, 4) pour comparaison."""
try:
return tuple(int(x) for x in v.strip().split("."))
except (ValueError, AttributeError):
return (0,)
def _get_self_update_config():
"""Lit la config [self-update] depuis config.ini. Retourne (url, exe_name, branch) ou (None, None, None)."""
config_path = get_config_path()
if not config_path.exists():
return None, None, None
config = configparser.ConfigParser()
config.read(config_path, encoding="utf-8")
if not config.has_section("self-update"):
return None, None, None
url = config.get("self-update", "url", fallback="").strip().rstrip("/")
exe_name = config.get("self-update", "exe_name", fallback="GitUpdateChecker.exe").strip()
branch = config.get("self-update", "branch", fallback="master").strip()
if not url:
return None, None, None
return url, exe_name, branch
def check_self_update():
"""
Vérifie si une nouvelle version est disponible sur le serveur Gitea.
Télécharge version.txt via HTTP et compare avec VERSION locale.
Retourne (needs_update: bool, info: str).
"""
repo_url, _, branch = _get_self_update_config()
if not repo_url:
log.info("Auto-update: pas de section [self-update] dans config.ini, skip")
return False, ""
log.info("Auto-update: verification via HTTP...")
version_url = f"{repo_url}/raw/branch/{branch}/version.txt"
try:
req = urllib.request.Request(version_url, headers={"User-Agent": "GitUpdateChecker"})
with urllib.request.urlopen(req, timeout=10) as resp:
remote_version = resp.read().decode("utf-8").strip()
except (urllib.error.URLError, OSError) as e:
log.warning(f"Auto-update: impossible de verifier la version distante: {e}")
return False, f"Impossible de contacter le serveur: {e}"
log.info(f"Auto-update: version locale={VERSION} distante={remote_version}")
if _version_tuple(remote_version) <= _version_tuple(VERSION):
log.info("Auto-update: programme a jour")
return False, ""
info = f"Version actuelle : {VERSION}\nVersion disponible : {remote_version}"
log.info(f"Auto-update: MAJ disponible - {remote_version}")
return True, info
def do_self_update():
"""
Télécharge le nouvel exe depuis le serveur Gitea.
Stratégie : télécharger dans .new, renommer l'exe actuel en .old, placer le nouveau.
Retourne (ok, message).
"""
repo_url, exe_name, branch = _get_self_update_config()
if not repo_url:
return False, "Configuration [self-update] manquante"
is_frozen = getattr(sys, "frozen", False)
if not is_frozen:
log.warning("Auto-update: mode script, telechargement non supporte")
return False, "Auto-update uniquement supporte en mode .exe"
exe_path = Path(sys.executable)
exe_old_path = exe_path.with_suffix(".exe.old")
exe_new_path = exe_path.with_suffix(".exe.new")
# Telecharger le nouvel exe
download_url = f"{repo_url}/raw/branch/{branch}/{exe_name}"
log.info(f"Auto-update: telechargement de {download_url}")
try:
req = urllib.request.Request(download_url, headers={"User-Agent": "GitUpdateChecker"})
total_bytes = 0
with urllib.request.urlopen(req, timeout=60) as resp, open(exe_new_path, "wb") as f:
while True:
chunk = resp.read(65536) # 64 Ko par bloc
if not chunk:
break
f.write(chunk)
total_bytes += len(chunk)
if total_bytes < 1000:
log.error(f"Auto-update: fichier telecharge trop petit ({total_bytes} octets)")
exe_new_path.unlink(missing_ok=True)
return False, "Le fichier telecharge semble invalide"
log.info(f"Auto-update: telecharge {total_bytes} octets -> {exe_new_path.name}")
# Supprimer le "Mark of the Web" (Zone.Identifier) ajouté par Windows sur les
# fichiers téléchargés via HTTP, qui bloquerait le chargement des DLL au lancement.
try:
os.remove(str(exe_new_path) + ":Zone.Identifier")
log.info("Auto-update: Zone.Identifier supprime")
except FileNotFoundError:
pass # Pas de marque, rien à faire
except OSError as e:
log.warning(f"Auto-update: impossible de supprimer Zone.Identifier: {e}")
except (urllib.error.URLError, OSError) as e:
log.error(f"Auto-update: echec telechargement: {e}")
if exe_new_path.exists():
try:
exe_new_path.unlink()
except OSError:
pass
return False, f"Erreur telechargement: {e}"
# Renommer : exe actuel -> .old
try:
if exe_old_path.exists():
exe_old_path.unlink()
exe_path.rename(exe_old_path)
log.info(f"Auto-update: {exe_path.name} -> {exe_old_path.name}")
except OSError as e:
log.error(f"Auto-update: impossible de renommer l'exe: {e}")
if exe_new_path.exists():
try:
exe_new_path.unlink()
except OSError:
pass
return False, f"Impossible de renommer l'exe: {e}"
# Renommer : .new -> exe
try:
exe_new_path.rename(exe_path)
log.info(f"Auto-update: {exe_new_path.name} -> {exe_path.name}")
except OSError as e:
log.error(f"Auto-update: impossible de placer le nouvel exe: {e}")
try:
exe_old_path.rename(exe_path)
log.info("Auto-update: ancien exe restaure")
except OSError:
pass
return False, f"Impossible de placer le nouvel exe: {e}"
return True, "Mise a jour reussie !\nLe programme va redemarrer."
def relaunch_program():
"""Relance le programme (nouvel exe) et quitte le processus actuel."""
if getattr(sys, "frozen", False):
exe_path = str(Path(sys.executable))
log.info(f"Auto-update: relance de {exe_path}")
# Lancer un batch qui attend 1s puis lance le nouvel exe et supprime l'ancien .old
bat_path = str(get_exe_dir() / "_update.bat")
bat_content = (
f'@echo off\n'
f'timeout /t 1 /nobreak >nul\n'
f'start "" "{exe_path}"\n'
f'del "{exe_path}.old" 2>nul\n'
f'del "%~f0"\n'
)
with open(bat_path, "w", encoding="mbcs") as f:
f.write(bat_content)
subprocess.Popen(
["cmd", "/c", bat_path],
creationflags=subprocess.CREATE_NO_WINDOW,
)
else:
# Mode script : relancer python
log.info("Auto-update: relance du script")
subprocess.Popen([sys.executable, __file__])
# ── Configuration ────────────────────────────────────────────────────────────
def get_config_path():
return get_exe_dir() / "config.ini"
def load_repos():
"""Charge la liste des dépôts depuis config.ini."""
config_path = get_config_path()
config = configparser.ConfigParser()
repos = []
if not config_path.exists():
# Créer un config.ini exemple
config["repo:Exemple"] = {
"url": "http://192.168.1.235:3125/user/repo",
"path": "../MonRepo",
}
with open(config_path, "w", encoding="utf-8") as f:
f.write("; Configuration des depots Git a surveiller\n")
f.write("; Les chemins (path) sont relatifs a l'emplacement de l'exe\n")
f.write("; Ajouter autant de sections [repo:NomDuRepo] que necessaire\n\n")
config.write(f)
return repos
config.read(config_path, encoding="utf-8")
for section in config.sections():
if section.startswith("repo:"):
name = section[5:]
url = config.get(section, "url", fallback="").strip()
path = config.get(section, "path", fallback="").strip()
if url and path:
repos.append({"name": name, "url": url, "path": path})
return repos
# ── Logique Git ──────────────────────────────────────────────────────────────
def check_repo(repo):
"""Vérifie un dépôt et retourne son état."""
name = repo["name"]
url = repo["url"]
local_path = str(resolve_relative(repo["path"]))
log.info(f"[{name}] Verification - {url} -> {local_path}")
result = {
"name": name,
"url": url,
"local_path": local_path,
"relative_path": repo["path"],
"exists": False,
"up_to_date": False,
"error": None,
"commits": [],
"files": [],
"local_changes": [],
"local_only": False,
"branch": "",
"local_hash": "",
"remote_hash": "",
"needs_clone": False,
"offline": False,
}
# Vérifier si le dossier existe et est un repo git
if not os.path.isdir(os.path.join(local_path, ".git")):
if os.path.isdir(local_path) and os.listdir(local_path):
result["error"] = "Le dossier existe mais n'est pas un depot git."
return result
result["needs_clone"] = True
return result
result["exists"] = True
# Branche courante
code, branch, _ = run_git(["rev-parse", "--abbrev-ref", "HEAD"], cwd=local_path)
if code != 0:
result["error"] = "Impossible de lire la branche courante."
return result
result["branch"] = branch
# Mettre à jour l'URL origin si elle a changé dans config.ini
run_git(["remote", "set-url", "origin", url], cwd=local_path)
# Fetch (détecte aussi si le remote est inaccessible)
code, _, err = run_git(["fetch", "origin"], cwd=local_path)
if code != 0:
offline_keywords = ["could not resolve", "connection refused", "unable to connect", "timed out", "the remote end hung up"]
if any(kw in err.lower() for kw in offline_keywords):
log.warning(f"[{name}] Remote inaccessible: {url}")
result["error"] = "Depot hors ligne (remote inaccessible)"
result["offline"] = True
else:
result["error"] = f"Erreur fetch : {err}"
return result
# Comparer HEAD local vs origin
code, local_hash, _ = run_git(["rev-parse", "HEAD"], cwd=local_path)
if code != 0:
result["error"] = "Impossible de lire le commit local."
return result
code, remote_hash, _ = run_git(["rev-parse", f"origin/{branch}"], cwd=local_path)
if code != 0:
result["error"] = f"Impossible de trouver origin/{branch}."
return result
result["local_hash"] = local_hash[:8]
result["remote_hash"] = remote_hash[:8]
# Modifications locales
code, local_diff, _ = run_git(["diff", "--name-status", "HEAD"], cwd=local_path)
if code == 0 and local_diff:
for line in local_diff.splitlines():
parts = line.split("\t", 1)
if len(parts) == 2:
sc = parts[0].strip()
fn = parts[1].strip()
status_map = {"D": "Supprime localement", "M": "Modifie localement"}
result["local_changes"].append({
"status": status_map.get(sc[0], sc),
"status_char": sc[0],
"file": fn,
})
if local_hash == remote_hash and not result["local_changes"]:
log.info(f"[{name}] A jour ({local_hash[:8]})")
result["up_to_date"] = True
return result
if local_hash == remote_hash:
log.info(f"[{name}] {len(result['local_changes'])} fichier(s) locaux modifies/supprimes")
result["local_only"] = True
return result
# Nouveaux commits distants
code, log_out, _ = run_git(
["log", "--oneline", "--format=%h|%an|%ar|%s", f"HEAD..origin/{branch}"],
cwd=local_path,
)
if code == 0 and log_out:
for line in log_out.splitlines():
parts = line.split("|", 3)
if len(parts) == 4:
result["commits"].append({
"hash": parts[0], "author": parts[1],
"date": parts[2], "message": parts[3],
})
# Fichiers distants modifiés
code, diff_out, _ = run_git(
["diff", "--name-status", f"HEAD..origin/{branch}"],
cwd=local_path,
)
if code == 0 and diff_out:
for line in diff_out.splitlines():
parts = line.split("\t", 1)
if len(parts) == 2:
sc = parts[0].strip()
fn = parts[1].strip()
status_map = {"A": "Ajoute", "M": "Modifie", "D": "Supprime", "R": "Renomme"}
result["files"].append({
"status": status_map.get(sc[0], sc),
"status_char": sc[0],
"file": fn,
})
return result
def do_clone(repo):
"""Clone un dépôt."""
local_path = str(resolve_relative(repo["path"]))
log.info(f"Clonage: {repo['url']} -> {local_path}")
code, _, err = run_git(["clone", repo["url"], local_path], timeout=300)
if code == 0:
log.info(f"Clonage reussi: {local_path}")
else:
log.error(f"Clonage echoue: {err}")
return code == 0, err
def do_pull(local_path, branch):
"""Pull les mises à jour (lecture seule)."""
log.info(f"Pull: {local_path} (branche {branch})")
code, out, err = run_git(["pull", "origin", branch], cwd=local_path, timeout=120)
if code == 0:
log.info(f"Pull reussi: {local_path}")
else:
log.error(f"Pull echoue: {err}")
return code == 0, out, err
def do_restore(local_path):
"""Restaure les fichiers locaux modifiés/supprimés."""
log.info(f"Restauration: {local_path}")
code, _, err = run_git(["checkout", "--", "."], cwd=local_path)
if code == 0:
log.info(f"Restauration reussie: {local_path}")
else:
log.error(f"Restauration echouee: {err}")
return code == 0, err
# ── Interface graphique ──────────────────────────────────────────────────────
class App(tk.Tk):
@staticmethod
def _find_icon(filename="icon.png"):
"""Cherche un fichier icône : d'abord à côté de l'exe, sinon dans _internal (bundle)."""
p = get_exe_dir() / filename
if p.exists():
return p
if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"):
p = Path(sys._MEIPASS) / filename
if p.exists():
return p
return None
def __init__(self):
super().__init__()
self.title(f"Git Update Checker v{VERSION}")
self.geometry("820x600")
self.minsize(700, 450)
self.configure(bg="#1e1e2e")
self.repos_config = load_repos()
self.repo_results = []
# Icône de la fenêtre (barre de titre + taskbar)
self._app_icon = None
icon_path = self._find_icon()
if icon_path:
try:
self._app_icon = tk.PhotoImage(file=str(icon_path))
self.iconphoto(True, self._app_icon)
except Exception as e:
log.warning(f"Icone fenetre: {e}")
self._app_icon = None
log.info(f"=== Demarrage Git Update Checker v{VERSION} ===")
self._cleanup_old_exe()
self._build_ui()
self.after(100, self._check_self_update_then_repos)
def _cleanup_old_exe(self):
"""Supprime l'ancien exe .old restant d'une mise a jour precedente."""
if getattr(sys, "frozen", False):
old_path = Path(sys.executable).with_suffix(".exe.old")
if old_path.exists():
try:
old_path.unlink()
log.info(f"Nettoyage: {old_path.name} supprime")
except OSError:
log.warning(f"Nettoyage: impossible de supprimer {old_path.name}")
def _build_ui(self):
style = ttk.Style(self)
style.theme_use("clam")
# Couleurs sombres
bg = "#1e1e2e"
fg = "#cdd6f4"
accent = "#89b4fa"
green = "#a6e3a1"
yellow = "#f9e2af"
red = "#f38ba8"
style.configure("TFrame", background=bg)
style.configure("TLabel", background=bg, foreground=fg, font=("Segoe UI", 10))
style.configure("Title.TLabel", background=bg, foreground=fg, font=("Segoe UI", 14, "bold"))
style.configure("Status.TLabel", background=bg, foreground=accent, font=("Segoe UI", 9))
style.configure("TButton", font=("Segoe UI", 10))
style.configure("Green.TLabel", background=bg, foreground=green, font=("Segoe UI", 10, "bold"))
style.configure("Yellow.TLabel", background=bg, foreground=yellow, font=("Segoe UI", 10, "bold"))
style.configure("Red.TLabel", background=bg, foreground=red, font=("Segoe UI", 10, "bold"))
# Header
header = ttk.Frame(self)
header.pack(fill="x", padx=15, pady=(15, 5))
# Icône dans le coin haut gauche (icon_small.png pré-générée à 32x32)
small_path = self._find_icon("icon_small.png")
if small_path:
try:
self._icon_small = tk.PhotoImage(file=str(small_path))
tk.Label(header, image=self._icon_small, bg=bg, bd=0).pack(side="left", padx=(0, 8))
except Exception as e:
log.warning(f"Icone header: {e}")
ttk.Label(header, text=f"Git Update Checker v{VERSION}", style="Title.TLabel").pack(side="left")
self.status_label = ttk.Label(header, text="Verification en cours...", style="Status.TLabel")
self.status_label.pack(side="right")
# Date (mise a jour chaque seconde)
self.date_label = ttk.Label(self, text="", style="Status.TLabel")
self.date_label.pack(anchor="w", padx=15)
self._tick_clock()
# Panneau principal (PanedWindow vertical : cartes en haut, log en bas)
paned = ttk.PanedWindow(self, orient="vertical")
paned.pack(fill="both", expand=True, padx=15, pady=10)
# Zone scrollable pour les repos
container = ttk.Frame(paned)
paned.add(container, weight=3)
self.canvas = tk.Canvas(container, bg=bg, highlightthickness=0)
scrollbar = ttk.Scrollbar(container, orient="vertical", command=self.canvas.yview)
self.scroll_frame = ttk.Frame(self.canvas)
self.scroll_frame.bind("<Configure>", lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all")))
self.canvas.create_window((0, 0), window=self.scroll_frame, anchor="nw")
self.canvas.configure(yscrollcommand=scrollbar.set)
self.canvas.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
# Scroll avec la molette : cible le bon widget selon la position du curseur
self.bind_all("<MouseWheel>", self._on_mousewheel)
# Panneau de log en bas
log_frame = tk.Frame(paned, bg="#181825")
paned.add(log_frame, weight=1)
log_header = tk.Frame(log_frame, bg="#181825")
log_header.pack(fill="x", padx=8, pady=(6, 2))
tk.Label(log_header, text="Journal des operations", bg="#181825", fg="#a6adc8", font=("Segoe UI", 9, "bold")).pack(side="left")
tk.Button(log_header, text="Effacer", bg="#313244", fg="#cdd6f4", bd=0, font=("Segoe UI", 8),
command=self._clear_log_panel, activebackground="#45475a", activeforeground="#cdd6f4").pack(side="right")
self.log_text = tk.Text(log_frame, bg="#11111b", fg="#cdd6f4", font=("Consolas", 9),
height=8, bd=0, highlightthickness=0, wrap="word",
state="disabled", padx=8, pady=4)
log_scroll = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview)
self.log_text.configure(yscrollcommand=log_scroll.set)
self.log_text.pack(side="left", fill="both", expand=True, padx=(8, 0), pady=(0, 8))
log_scroll.pack(side="right", fill="y", padx=(0, 8), pady=(0, 8))
# Tags couleur pour le log GUI
self.log_text.tag_configure("info", foreground="#cdd6f4")
self.log_text.tag_configure("success", foreground="#a6e3a1")
self.log_text.tag_configure("warning", foreground="#f9e2af")
self.log_text.tag_configure("error", foreground="#f38ba8")
self.log_text.tag_configure("file_add", foreground="#a6e3a1")
self.log_text.tag_configure("file_mod", foreground="#f9e2af")
self.log_text.tag_configure("file_del", foreground="#f38ba8")
self.log_text.tag_configure("dim", foreground="#6c7086")
# Boutons en bas
btn_frame = ttk.Frame(self)
btn_frame.pack(fill="x", padx=15, pady=(0, 15))
self.btn_refresh = ttk.Button(btn_frame, text="Rafraichir", command=self._start_check)
self.btn_refresh.pack(side="left", padx=(0, 10))
self.btn_update_all = ttk.Button(btn_frame, text="Tout mettre a jour", command=self._update_all)
self.btn_update_all.pack(side="left")
self.btn_update_all.state(["disabled"])
ttk.Button(btn_frame, text="Ouvrir config.ini", command=self._open_config).pack(side="right")
self.btn_open_log = ttk.Button(btn_frame, text="Ouvrir les logs", command=self._open_log)
self.btn_open_log.pack(side="right", padx=(0, 10))
def _tick_clock(self):
"""Met à jour le label de date chaque seconde."""
self.date_label.configure(text=datetime.now().strftime(" %d/%m/%Y %H:%M:%S"))
self.after(1000, self._tick_clock)
def _on_mousewheel(self, event):
"""Scroll le bon widget selon où se trouve la souris."""
w = event.widget
while w is not None:
if w is self.canvas or w is self.scroll_frame:
self.canvas.yview_scroll(int(-1 * (event.delta / 120)), "units")
return
if w is self.log_text:
self.log_text.yview_scroll(int(-1 * (event.delta / 120)), "units")
return
try:
w = w.master
except AttributeError:
break
def _check_self_update_then_repos(self):
"""Vérifie d'abord la MAJ du programme, puis les repos."""
self.status_label.configure(text="Verification auto-update...")
def work():
needs, info = check_self_update()
self.after(0, lambda: self._handle_self_update(needs, info))
threading.Thread(target=work, daemon=True).start()
def _log_gui(self, message, tag="info"):
"""Ajoute une ligne dans le panneau de log."""
def _append():
self.log_text.configure(state="normal")
timestamp = datetime.now().strftime("%H:%M:%S")
self.log_text.insert("end", f"[{timestamp}] ", "dim")
self.log_text.insert("end", f"{message}\n", tag)
self.log_text.see("end")
self.log_text.configure(state="disabled")
# Appel thread-safe
if threading.current_thread() is threading.main_thread():
_append()
else:
self.after(0, _append)
def _clear_log_panel(self):
self.log_text.configure(state="normal")
self.log_text.delete("1.0", "end")
self.log_text.configure(state="disabled")
def _handle_self_update(self, needs_update, info):
"""Gère le résultat de l'auto-update."""
if needs_update:
answer = messagebox.askyesno(
"Mise a jour du programme",
f"Une mise a jour du programme est disponible !\n\n{info}\n\nMettre a jour maintenant ?",
)
if answer:
self.status_label.configure(text="Mise a jour du programme...")
def work():
ok, msg = do_self_update()
self.after(0, lambda: self._self_update_done(ok, msg))
threading.Thread(target=work, daemon=True).start()
return
self._start_check()
def _self_update_done(self, ok, msg):
"""Callback après auto-update."""
if ok:
messagebox.showinfo("Auto-update", msg)
log.info("Auto-update appliquee, relance...")
relaunch_program()
self.destroy()
return
else:
messagebox.showwarning("Auto-update", msg)
self._start_check()
def _start_check(self):
"""Lance la vérification dans un thread."""
self.btn_refresh.state(["disabled"])
self.btn_update_all.state(["disabled"])
self.status_label.configure(text="Verification en cours...")
self._log_gui("Verification des depots...", "info")
self.repos_config = load_repos()
if not self.repos_config:
self._clear_cards()
self.status_label.configure(text="Aucun depot configure")
self._show_no_repos()
self.btn_refresh.state(["!disabled"])
return
threading.Thread(target=self._check_all, daemon=True).start()
def _check_all(self):
"""Vérifie tous les repos en parallèle (max 4 à la fois)."""
total = len(self.repos_config)
results = [None] * total
done = 0
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {}
for i, repo in enumerate(self.repos_config):
self._log_gui(f"[{repo['name']}] Verification...", "dim")
futures[executor.submit(check_repo, repo)] = i
for future in as_completed(futures):
idx = futures[future]
res = future.result()
results[idx] = res
done += 1
# Résumé concis selon le statut
if res.get("offline"):
self._log_gui(f"[{res['name']}] Hors ligne", "warning")
elif res.get("error"):
self._log_gui(f"[{res['name']}] Erreur : {res['error']}", "error")
elif res.get("needs_clone"):
self._log_gui(f"[{res['name']}] A cloner", "warning")
elif res["up_to_date"]:
self._log_gui(f"[{res['name']}] A jour", "success")
else:
count = len(res["commits"]) + len(res["local_changes"])
self._log_gui(f"[{res['name']}] {count} changement(s) disponible(s)", "warning")
self.after(0, lambda d=done, t=total: self.status_label.configure(text=f"Verification {d}/{t}..."))
self.repo_results = results
self.after(0, self._display_results)
def _display_results(self):
"""Affiche les résultats dans la GUI."""
self._clear_cards()
has_any_updates = False
for res in self.repo_results:
card = self._create_card(res)
card.pack(fill="x", pady=5)
if not res["up_to_date"] and not res.get("error") and not res.get("offline") and not res.get("needs_clone"):
has_any_updates = True
total = len(self.repo_results)
up = sum(1 for r in self.repo_results if r["up_to_date"])
log.info(f"Resultat: {up}/{total} depots a jour")
self._log_gui(f"Verification terminee : {up}/{total} depots a jour", "success" if up == total else "warning")
self.status_label.configure(text=f"{up}/{total} depots a jour")
self.btn_refresh.state(["!disabled"])
if has_any_updates:
self.btn_update_all.state(["!disabled"])
def _clear_cards(self):
for w in self.scroll_frame.winfo_children():
w.destroy()
def _create_card(self, res):
"""Crée une carte pour un dépôt."""
bg_card = "#313244"
fg = "#cdd6f4"
green = "#a6e3a1"
yellow = "#f9e2af"
red = "#f38ba8"
cyan = "#89dceb"
dim = "#6c7086"
card = tk.Frame(self.scroll_frame, bg=bg_card, bd=0, highlightthickness=1, highlightbackground="#45475a")
# Header de la carte
top = tk.Frame(card, bg=bg_card)
top.pack(fill="x", padx=12, pady=(10, 5))
tk.Label(top, text=res["name"], bg=bg_card, fg=fg, font=("Segoe UI", 11, "bold")).pack(side="left")
if res.get("offline"):
tk.Label(top, text="HORS LIGNE", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right")
elif res.get("error"):
tk.Label(top, text="ERREUR", bg=bg_card, fg=red, font=("Segoe UI", 9, "bold")).pack(side="right")
elif res.get("needs_clone"):
tk.Label(top, text="A CLONER", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right")
elif res["up_to_date"]:
tk.Label(top, text="A JOUR", bg=bg_card, fg=green, font=("Segoe UI", 9, "bold")).pack(side="right")
else:
count = len(res["commits"]) + len(res["local_changes"])
tk.Label(top, text=f"MAJ DISPO ({count})", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right")
# Infos
info = tk.Frame(card, bg=bg_card)
info.pack(fill="x", padx=12)
tk.Label(info, text=f"Chemin : {res['relative_path']}", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w")
if res["branch"]:
tk.Label(info, text=f"Branche : {res['branch']} | {res['local_hash']}", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w")
# Erreur
if res.get("error"):
tk.Label(card, text=f" {res['error']}", bg=bg_card, fg=red, font=("Segoe UI", 9), anchor="w").pack(fill="x", padx=12, pady=(5, 0))
# Commits distants
if res["commits"]:
tk.Label(card, text=f"Nouveaux commits ({len(res['commits'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(8, 2))
for c in res["commits"][:5]:
line_frame = tk.Frame(card, bg=bg_card)
line_frame.pack(fill="x", padx=20)
tk.Label(line_frame, text=c["hash"], bg=bg_card, fg=cyan, font=("Consolas", 9)).pack(side="left")
tk.Label(line_frame, text=f" {c['message']}", bg=bg_card, fg=fg, font=("Segoe UI", 9), anchor="w").pack(side="left", fill="x")
if len(res["commits"]) > 5:
tk.Label(card, text=f" ... et {len(res['commits']) - 5} autres commits", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20)
# Fichiers distants modifiés
if res["files"]:
tk.Label(card, text=f"Fichiers distants modifies ({len(res['files'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(5, 2))
color_map = {"A": green, "M": yellow, "D": red, "R": cyan}
for f in res["files"][:8]:
line_frame = tk.Frame(card, bg=bg_card)
line_frame.pack(fill="x", padx=20)
c = color_map.get(f["status_char"], fg)
tk.Label(line_frame, text=f"[{f['status']:>9}]", bg=bg_card, fg=c, font=("Consolas", 9)).pack(side="left")
tk.Label(line_frame, text=f" {f['file']}", bg=bg_card, fg=fg, font=("Segoe UI", 9)).pack(side="left")
if len(res["files"]) > 8:
tk.Label(card, text=f" ... et {len(res['files']) - 8} autres fichiers", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20)
# Fichiers locaux manquants
if res["local_changes"]:
tk.Label(card, text=f"Fichiers locaux a restaurer ({len(res['local_changes'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(5, 2))
for f in res["local_changes"][:8]:
line_frame = tk.Frame(card, bg=bg_card)
line_frame.pack(fill="x", padx=20)
c = red if f["status_char"] == "D" else yellow
tk.Label(line_frame, text=f"[{f['status']:>20}]", bg=bg_card, fg=c, font=("Consolas", 8)).pack(side="left")
tk.Label(line_frame, text=f" {f['file']}", bg=bg_card, fg=fg, font=("Segoe UI", 9)).pack(side="left")
if len(res["local_changes"]) > 8:
tk.Label(card, text=f" ... et {len(res['local_changes']) - 8} autres fichiers", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20)
# Bouton d'action
if not res["up_to_date"] and not res.get("error"):
btn_frame = tk.Frame(card, bg=bg_card)
btn_frame.pack(fill="x", padx=12, pady=(8, 0))
if res.get("needs_clone"):
btn = ttk.Button(btn_frame, text="Cloner le depot", command=lambda r=res: self._do_clone(r))
else:
btn = ttk.Button(btn_frame, text="Mettre a jour", command=lambda r=res: self._do_update(r))
btn.pack(side="left")
# Stocker ref du bouton pour le désactiver après
res["_btn"] = btn
# Padding bas
tk.Frame(card, bg=bg_card, height=10).pack()
return card
def _do_update(self, res, batch=False):
"""Met à jour un dépôt (pull + restore)."""
name = res["name"]
log.info(f"[{name}] MAJ unitaire demandee")
self._log_gui(f"[{name}] Mise a jour en cours...", "info")
if "_btn" in res:
res["_btn"].state(["disabled"])
def work():
messages = []
success = True
local_path = res["local_path"]
branch = res["branch"]
tag_map = {"A": "file_add", "M": "file_mod", "D": "file_del", "R": "file_mod"}
if res["commits"]:
log.info(f"[{name}] Pull de {len(res['commits'])} commit(s)...")
self._log_gui(f"[{name}] Telechargement de {len(res['commits'])} commit(s)...", "info")
ok, out, err = do_pull(local_path, branch)
if ok:
msg = f"Pull OK : {len(res['commits'])} commits telecharges."
log.info(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "success")
# Logger chaque fichier distant
for f in res.get("files", []):
tag = tag_map.get(f["status_char"], "info")
self._log_gui(f" [{f['status']:>9}] {f['file']}", tag)
messages.append(msg)
else:
msg = f"Erreur pull : {err}"
log.error(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "error")
messages.append(msg)
success = False
if res["local_changes"]:
log.info(f"[{name}] Restauration de {len(res['local_changes'])} fichier(s)...")
self._log_gui(f"[{name}] Restauration de {len(res['local_changes'])} fichier(s)...", "info")
ok, err = do_restore(local_path)
if ok:
msg = f"Restauration OK : {len(res['local_changes'])} fichiers restaures."
log.info(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "success")
# Logger chaque fichier restauré
for f in res["local_changes"]:
tag = tag_map.get(f["status_char"], "info")
self._log_gui(f" [Restaure] {f['file']}", tag)
messages.append(msg)
else:
msg = f"Erreur restauration : {err}"
log.error(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "error")
messages.append(msg)
success = False
status = "SUCCES" if success else "ECHEC"
log.info(f"[{name}] MAJ unitaire terminee - {status}")
self._log_gui(f"[{name}] Termine - {status}", "success" if success else "error")
self.after(0, lambda: self._show_update_result(res, messages, success, batch=batch))
threading.Thread(target=work, daemon=True).start()
def _do_clone(self, res):
"""Clone un dépôt."""
name = res["name"]
log.info(f"[{name}] Clonage demande - {res['url']}")
self._log_gui(f"[{name}] Clonage en cours depuis {res['url']}...", "info")
if "_btn" in res:
res["_btn"].state(["disabled"])
repo = {"url": res["url"], "path": res["relative_path"]}
def work():
ok, err = do_clone(repo)
if ok:
msg = f"Depot '{name}' clone avec succes !"
log.info(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "success")
else:
msg = f"Erreur de clonage : {err}"
log.error(f"[{name}] {msg}")
self._log_gui(f"[{name}] {msg}", "error")
self.after(0, lambda: messagebox.showinfo("Clonage", msg))
self.after(100, self._start_check)
threading.Thread(target=work, daemon=True).start()
def _show_update_result(self, res, messages, success, batch=False):
if batch:
# En mode batch : pas de messagebox individuelle, on ne rafraichit qu'une fois à la fin
self._batch_remaining -= 1
if self._batch_remaining == 0:
self._start_check()
else:
title = "Mise a jour" if success else "Erreur"
messagebox.showinfo(title, f"{res['name']}\n\n" + "\n".join(messages))
self._start_check()
def _update_all(self):
"""Met à jour tous les dépôts qui ont des MAJ."""
to_update = [r for r in self.repo_results if not r["up_to_date"] and not r.get("error") and not r.get("needs_clone")]
if not to_update:
return
log.info(f"MAJ globale demandee - {len(to_update)} depot(s) a mettre a jour")
self._log_gui(f"MAJ globale : {len(to_update)} depot(s) a mettre a jour", "warning")
# Compteur pour n'appeler _start_check qu'une seule fois quand tous sont termines
self._batch_remaining = len(to_update)
for res in to_update:
self._do_update(res, batch=True)
def _show_no_repos(self):
bg_card = "#313244"
fg = "#cdd6f4"
dim = "#6c7086"
card = tk.Frame(self.scroll_frame, bg=bg_card, bd=0, highlightthickness=1, highlightbackground="#45475a")
card.pack(fill="x", pady=5)
tk.Label(card, text="Aucun depot configure", bg=bg_card, fg=fg, font=("Segoe UI", 11, "bold")).pack(padx=12, pady=(12, 5))
tk.Label(card, text="Edite config.ini pour ajouter des depots :", bg=bg_card, fg=dim, font=("Segoe UI", 9)).pack(padx=12)
example = (
"[repo:MonRepo]\n"
"url = http://192.168.1.235:3125/user/repo\n"
"path = ../MonRepo"
)
tk.Label(card, text=example, bg="#1e1e2e", fg="#a6e3a1", font=("Consolas", 9), justify="left", padx=10, pady=8).pack(padx=12, pady=8, fill="x")
tk.Frame(card, bg=bg_card, height=10).pack()
def _open_config(self):
config_path = str(get_config_path())
if sys.platform == "win32":
os.startfile(config_path)
def _open_log(self):
log_dir = str(get_exe_dir() / "log")
if sys.platform == "win32":
os.startfile(log_dir)
if __name__ == "__main__":
app = App()
app.mainloop()