""" Git Update Checker - GUI multi-repo. Vérifie les mises à jour de plusieurs dépôts Git et propose de les télécharger. Accès lecture seule uniquement (fetch/pull/checkout, jamais de push). Tous les chemins sont relatifs à l'emplacement de l'exécutable. """ VERSION = "0.5.1" import subprocess import sys import os import configparser import logging import threading import tkinter as tk from tkinter import ttk, messagebox from pathlib import Path from datetime import datetime import urllib.request import urllib.error # Forcer UTF-8 sur Windows if sys.platform == "win32": os.system("chcp 65001 >nul 2>&1") try: sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") except Exception: pass # ── Utilitaires ────────────────────────────────────────────────────────────── def get_exe_dir(): if getattr(sys, "frozen", False): return Path(sys.executable).parent return Path(__file__).parent def resolve_relative(path_str): """Résout un chemin relatif par rapport au dossier de l'exe.""" p = Path(path_str) if not p.is_absolute(): p = get_exe_dir() / p return p.resolve() # ── Logging ────────────────────────────────────────────────────────────────── def setup_logging(): """Configure le logging dans un dossier log/ à côté de l'exe.""" log_dir = get_exe_dir() / "log" log_dir.mkdir(exist_ok=True) log_file = log_dir / f"{datetime.now().strftime('%Y-%m-%d')}.log" logger = logging.getLogger("GitUpdateChecker") logger.setLevel(logging.DEBUG) # Handler fichier fh = logging.FileHandler(log_file, encoding="utf-8") fh.setLevel(logging.DEBUG) fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")) logger.addHandler(fh) # Nettoyage des vieux logs (garder 30 jours) for old_log in sorted(log_dir.glob("*.log"))[:-30]: try: old_log.unlink() except OSError: pass return logger log = setup_logging() def run_git(args, cwd=None): # -c safe.directory=* : évite l'erreur "dubious ownership" sur clé USB cmd = ["git", "-c", "safe.directory=*"] + args log.debug(f"git {' '.join(args)} (cwd={cwd})") try: result = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, timeout=30, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, ) if result.returncode != 0 and result.stderr.strip(): log.warning(f"git {args[0]} erreur: {result.stderr.strip()}") return result.returncode, result.stdout.strip(), result.stderr.strip() except FileNotFoundError: log.error("Git non trouve dans le PATH") return -1, "", "Git n'est pas installe ou pas dans le PATH." except subprocess.TimeoutExpired: log.error(f"Timeout: git {' '.join(args)}") return 1, "", "Timeout" # ── Auto-update du programme ───────────────────────────────────────────────── def _version_tuple(v): """Convertit '0.4' en (0, 4) pour comparaison.""" try: return tuple(int(x) for x in v.strip().split(".")) except (ValueError, AttributeError): return (0,) def _get_self_update_config(): """Lit la config [self-update] depuis config.ini. Retourne (url, exe_name) ou (None, None).""" config_path = get_config_path() if not config_path.exists(): return None, None config = configparser.ConfigParser() config.read(config_path, encoding="utf-8") if not config.has_section("self-update"): return None, None url = config.get("self-update", "url", fallback="").strip().rstrip("/") exe_name = config.get("self-update", "exe_name", fallback="GitUpdateChecker.exe").strip() if not url: return None, None return url, exe_name def check_self_update(): """ Vérifie si une nouvelle version est disponible sur le serveur Gitea. Télécharge version.txt via HTTP et compare avec VERSION locale. Retourne (needs_update: bool, info: str). """ repo_url, _ = _get_self_update_config() if not repo_url: log.info("Auto-update: pas de section [self-update] dans config.ini, skip") return False, "" log.info("Auto-update: verification via HTTP...") version_url = f"{repo_url}/raw/branch/master/version.txt" try: req = urllib.request.Request(version_url, headers={"User-Agent": "GitUpdateChecker"}) with urllib.request.urlopen(req, timeout=10) as resp: remote_version = resp.read().decode("utf-8").strip() except (urllib.error.URLError, OSError) as e: log.warning(f"Auto-update: impossible de verifier la version distante: {e}") return False, f"Impossible de contacter le serveur: {e}" log.info(f"Auto-update: version locale={VERSION} distante={remote_version}") if _version_tuple(remote_version) <= _version_tuple(VERSION): log.info("Auto-update: programme a jour") return False, "" info = f"Version actuelle : {VERSION}\nVersion disponible : {remote_version}" log.info(f"Auto-update: MAJ disponible - {remote_version}") return True, info def do_self_update(): """ Télécharge le nouvel exe depuis le serveur Gitea. Stratégie : télécharger dans .new, renommer l'exe actuel en .old, placer le nouveau. Retourne (ok, message). """ repo_url, exe_name = _get_self_update_config() if not repo_url: return False, "Configuration [self-update] manquante" is_frozen = getattr(sys, "frozen", False) if not is_frozen: log.warning("Auto-update: mode script, telechargement non supporte") return False, "Auto-update uniquement supporte en mode .exe" exe_path = Path(sys.executable) exe_old_path = exe_path.with_suffix(".exe.old") exe_new_path = exe_path.with_suffix(".exe.new") # Telecharger le nouvel exe download_url = f"{repo_url}/raw/branch/master/{exe_name}" log.info(f"Auto-update: telechargement de {download_url}") try: req = urllib.request.Request(download_url, headers={"User-Agent": "GitUpdateChecker"}) with urllib.request.urlopen(req, timeout=60) as resp: data = resp.read() if len(data) < 1000: log.error(f"Auto-update: fichier telecharge trop petit ({len(data)} octets)") return False, "Le fichier telecharge semble invalide" with open(exe_new_path, "wb") as f: f.write(data) log.info(f"Auto-update: telecharge {len(data)} octets -> {exe_new_path.name}") except (urllib.error.URLError, OSError) as e: log.error(f"Auto-update: echec telechargement: {e}") if exe_new_path.exists(): try: exe_new_path.unlink() except OSError: pass return False, f"Erreur telechargement: {e}" # Renommer : exe actuel -> .old try: if exe_old_path.exists(): exe_old_path.unlink() exe_path.rename(exe_old_path) log.info(f"Auto-update: {exe_path.name} -> {exe_old_path.name}") except OSError as e: log.error(f"Auto-update: impossible de renommer l'exe: {e}") if exe_new_path.exists(): try: exe_new_path.unlink() except OSError: pass return False, f"Impossible de renommer l'exe: {e}" # Renommer : .new -> exe try: exe_new_path.rename(exe_path) log.info(f"Auto-update: {exe_new_path.name} -> {exe_path.name}") except OSError as e: log.error(f"Auto-update: impossible de placer le nouvel exe: {e}") try: exe_old_path.rename(exe_path) log.info("Auto-update: ancien exe restaure") except OSError: pass return False, f"Impossible de placer le nouvel exe: {e}" return True, "Mise a jour reussie !\nLe programme va redemarrer." def relaunch_program(): """Relance le programme (nouvel exe) et quitte le processus actuel.""" if getattr(sys, "frozen", False): exe_path = str(Path(sys.executable)) log.info(f"Auto-update: relance de {exe_path}") # Lancer un batch qui attend 1s puis lance le nouvel exe et supprime l'ancien .old bat_path = str(get_exe_dir() / "_update.bat") bat_content = ( f'@echo off\n' f'timeout /t 1 /nobreak >nul\n' f'start "" "{exe_path}"\n' f'del "{exe_path}.old" 2>nul\n' f'del "%~f0"\n' ) with open(bat_path, "w") as f: f.write(bat_content) subprocess.Popen( ["cmd", "/c", bat_path], creationflags=subprocess.CREATE_NO_WINDOW, ) else: # Mode script : relancer python log.info("Auto-update: relance du script") subprocess.Popen([sys.executable, __file__]) # ── Configuration ──────────────────────────────────────────────────────────── def get_config_path(): return get_exe_dir() / "config.ini" def load_repos(): """Charge la liste des dépôts depuis config.ini.""" config_path = get_config_path() config = configparser.ConfigParser() repos = [] if not config_path.exists(): # Créer un config.ini exemple config["repo:Exemple"] = { "url": "http://192.168.1.235:3125/user/repo", "path": "../MonRepo", } with open(config_path, "w", encoding="utf-8") as f: f.write("; Configuration des depots Git a surveiller\n") f.write("; Les chemins (path) sont relatifs a l'emplacement de l'exe\n") f.write("; Ajouter autant de sections [repo:NomDuRepo] que necessaire\n\n") config.write(f) return repos config.read(config_path, encoding="utf-8") for section in config.sections(): if section.startswith("repo:"): name = section[5:] url = config.get(section, "url", fallback="").strip() path = config.get(section, "path", fallback="").strip() if url and path: repos.append({"name": name, "url": url, "path": path}) return repos # ── Logique Git ────────────────────────────────────────────────────────────── def check_repo(repo): """Vérifie un dépôt et retourne son état.""" name = repo["name"] url = repo["url"] local_path = str(resolve_relative(repo["path"])) log.info(f"[{name}] Verification - {url} -> {local_path}") result = { "name": name, "url": url, "local_path": local_path, "relative_path": repo["path"], "exists": False, "up_to_date": False, "error": None, "commits": [], "files": [], "local_changes": [], "local_only": False, "branch": "", "local_hash": "", "remote_hash": "", "needs_clone": False, } # Vérifier si le dossier existe et est un repo git if not os.path.isdir(os.path.join(local_path, ".git")): if os.path.isdir(local_path) and os.listdir(local_path): result["error"] = "Le dossier existe mais n'est pas un depot git." return result result["needs_clone"] = True return result result["exists"] = True # Branche courante code, branch, _ = run_git(["rev-parse", "--abbrev-ref", "HEAD"], cwd=local_path) if code != 0: result["error"] = "Impossible de lire la branche courante." return result result["branch"] = branch # Fetch code, _, err = run_git(["fetch", "origin"], cwd=local_path) if code != 0: result["error"] = f"Erreur fetch : {err}" return result # Comparer HEAD local vs origin code, local_hash, _ = run_git(["rev-parse", "HEAD"], cwd=local_path) if code != 0: result["error"] = "Impossible de lire le commit local." return result code, remote_hash, _ = run_git(["rev-parse", f"origin/{branch}"], cwd=local_path) if code != 0: result["error"] = f"Impossible de trouver origin/{branch}." return result result["local_hash"] = local_hash[:8] result["remote_hash"] = remote_hash[:8] # Modifications locales code, local_diff, _ = run_git(["diff", "--name-status", "HEAD"], cwd=local_path) if code == 0 and local_diff: for line in local_diff.splitlines(): parts = line.split("\t", 1) if len(parts) == 2: sc = parts[0].strip() fn = parts[1].strip() status_map = {"D": "Supprime localement", "M": "Modifie localement"} result["local_changes"].append({ "status": status_map.get(sc[0], sc), "status_char": sc[0], "file": fn, }) if local_hash == remote_hash and not result["local_changes"]: log.info(f"[{name}] A jour ({local_hash[:8]})") result["up_to_date"] = True return result if local_hash == remote_hash: log.info(f"[{name}] {len(result['local_changes'])} fichier(s) locaux modifies/supprimes") result["local_only"] = True return result # Nouveaux commits distants code, log_out, _ = run_git( ["log", "--oneline", "--format=%h|%an|%ar|%s", f"HEAD..origin/{branch}"], cwd=local_path, ) if code == 0 and log_out: for line in log_out.splitlines(): parts = line.split("|", 3) if len(parts) == 4: result["commits"].append({ "hash": parts[0], "author": parts[1], "date": parts[2], "message": parts[3], }) # Fichiers distants modifiés code, diff_out, _ = run_git( ["diff", "--name-status", f"HEAD..origin/{branch}"], cwd=local_path, ) if code == 0 and diff_out: for line in diff_out.splitlines(): parts = line.split("\t", 1) if len(parts) == 2: sc = parts[0].strip() fn = parts[1].strip() status_map = {"A": "Ajoute", "M": "Modifie", "D": "Supprime", "R": "Renomme"} result["files"].append({ "status": status_map.get(sc[0], sc), "status_char": sc[0], "file": fn, }) return result def do_clone(repo): """Clone un dépôt.""" local_path = str(resolve_relative(repo["path"])) log.info(f"Clonage: {repo['url']} -> {local_path}") code, _, err = run_git(["clone", repo["url"], local_path]) if code == 0: log.info(f"Clonage reussi: {local_path}") else: log.error(f"Clonage echoue: {err}") return code == 0, err def do_pull(local_path, branch): """Pull les mises à jour (lecture seule).""" log.info(f"Pull: {local_path} (branche {branch})") code, out, err = run_git(["pull", "origin", branch], cwd=local_path) if code == 0: log.info(f"Pull reussi: {local_path}") else: log.error(f"Pull echoue: {err}") return code == 0, out, err def do_restore(local_path): """Restaure les fichiers locaux modifiés/supprimés.""" log.info(f"Restauration: {local_path}") code, _, err = run_git(["checkout", "--", "."], cwd=local_path) if code == 0: log.info(f"Restauration reussie: {local_path}") else: log.error(f"Restauration echouee: {err}") return code == 0, err # ── Interface graphique ────────────────────────────────────────────────────── class App(tk.Tk): def __init__(self): super().__init__() self.title(f"Git Update Checker v{VERSION}") self.geometry("820x600") self.minsize(700, 450) self.configure(bg="#1e1e2e") self.repos_config = load_repos() self.repo_results = [] log.info(f"=== Demarrage Git Update Checker v{VERSION} ===") self._cleanup_old_exe() self._build_ui() self.after(100, self._check_self_update_then_repos) def _cleanup_old_exe(self): """Supprime l'ancien exe .old restant d'une mise a jour precedente.""" if getattr(sys, "frozen", False): old_path = Path(sys.executable).with_suffix(".exe.old") if old_path.exists(): try: old_path.unlink() log.info(f"Nettoyage: {old_path.name} supprime") except OSError: log.warning(f"Nettoyage: impossible de supprimer {old_path.name}") def _build_ui(self): style = ttk.Style(self) style.theme_use("clam") # Couleurs sombres bg = "#1e1e2e" fg = "#cdd6f4" accent = "#89b4fa" green = "#a6e3a1" yellow = "#f9e2af" red = "#f38ba8" style.configure("TFrame", background=bg) style.configure("TLabel", background=bg, foreground=fg, font=("Segoe UI", 10)) style.configure("Title.TLabel", background=bg, foreground=fg, font=("Segoe UI", 14, "bold")) style.configure("Status.TLabel", background=bg, foreground=accent, font=("Segoe UI", 9)) style.configure("TButton", font=("Segoe UI", 10)) style.configure("Green.TLabel", background=bg, foreground=green, font=("Segoe UI", 10, "bold")) style.configure("Yellow.TLabel", background=bg, foreground=yellow, font=("Segoe UI", 10, "bold")) style.configure("Red.TLabel", background=bg, foreground=red, font=("Segoe UI", 10, "bold")) # Header header = ttk.Frame(self) header.pack(fill="x", padx=15, pady=(15, 5)) ttk.Label(header, text=f"Git Update Checker v{VERSION}", style="Title.TLabel").pack(side="left") self.status_label = ttk.Label(header, text="Verification en cours...", style="Status.TLabel") self.status_label.pack(side="right") # Date date_label = ttk.Label(self, text=datetime.now().strftime(" %d/%m/%Y %H:%M:%S"), style="Status.TLabel") date_label.pack(anchor="w", padx=15) # Panneau principal (PanedWindow vertical : cartes en haut, log en bas) paned = ttk.PanedWindow(self, orient="vertical") paned.pack(fill="both", expand=True, padx=15, pady=10) # Zone scrollable pour les repos container = ttk.Frame(paned) paned.add(container, weight=3) self.canvas = tk.Canvas(container, bg=bg, highlightthickness=0) scrollbar = ttk.Scrollbar(container, orient="vertical", command=self.canvas.yview) self.scroll_frame = ttk.Frame(self.canvas) self.scroll_frame.bind("", lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))) self.canvas.create_window((0, 0), window=self.scroll_frame, anchor="nw") self.canvas.configure(yscrollcommand=scrollbar.set) self.canvas.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Scroll avec la molette self.canvas.bind_all("", lambda e: self.canvas.yview_scroll(int(-1 * (e.delta / 120)), "units")) # Panneau de log en bas log_frame = tk.Frame(paned, bg="#181825") paned.add(log_frame, weight=1) log_header = tk.Frame(log_frame, bg="#181825") log_header.pack(fill="x", padx=8, pady=(6, 2)) tk.Label(log_header, text="Journal des operations", bg="#181825", fg="#a6adc8", font=("Segoe UI", 9, "bold")).pack(side="left") tk.Button(log_header, text="Effacer", bg="#313244", fg="#cdd6f4", bd=0, font=("Segoe UI", 8), command=self._clear_log_panel, activebackground="#45475a", activeforeground="#cdd6f4").pack(side="right") self.log_text = tk.Text(log_frame, bg="#11111b", fg="#cdd6f4", font=("Consolas", 9), height=8, bd=0, highlightthickness=0, wrap="word", state="disabled", padx=8, pady=4) log_scroll = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview) self.log_text.configure(yscrollcommand=log_scroll.set) self.log_text.pack(side="left", fill="both", expand=True, padx=(8, 0), pady=(0, 8)) log_scroll.pack(side="right", fill="y", padx=(0, 8), pady=(0, 8)) # Tags couleur pour le log GUI self.log_text.tag_configure("info", foreground="#cdd6f4") self.log_text.tag_configure("success", foreground="#a6e3a1") self.log_text.tag_configure("warning", foreground="#f9e2af") self.log_text.tag_configure("error", foreground="#f38ba8") self.log_text.tag_configure("file_add", foreground="#a6e3a1") self.log_text.tag_configure("file_mod", foreground="#f9e2af") self.log_text.tag_configure("file_del", foreground="#f38ba8") self.log_text.tag_configure("dim", foreground="#6c7086") # Boutons en bas btn_frame = ttk.Frame(self) btn_frame.pack(fill="x", padx=15, pady=(0, 15)) self.btn_refresh = ttk.Button(btn_frame, text="Rafraichir", command=self._start_check) self.btn_refresh.pack(side="left", padx=(0, 10)) self.btn_update_all = ttk.Button(btn_frame, text="Tout mettre a jour", command=self._update_all) self.btn_update_all.pack(side="left") self.btn_update_all.state(["disabled"]) ttk.Button(btn_frame, text="Ouvrir config.ini", command=self._open_config).pack(side="right") self.btn_open_log = ttk.Button(btn_frame, text="Ouvrir les logs", command=self._open_log) self.btn_open_log.pack(side="right", padx=(0, 10)) def _check_self_update_then_repos(self): """Vérifie d'abord la MAJ du programme, puis les repos.""" self.status_label.configure(text="Verification auto-update...") def work(): needs, info = check_self_update() self.after(0, lambda: self._handle_self_update(needs, info)) threading.Thread(target=work, daemon=True).start() def _log_gui(self, message, tag="info"): """Ajoute une ligne dans le panneau de log.""" def _append(): self.log_text.configure(state="normal") timestamp = datetime.now().strftime("%H:%M:%S") self.log_text.insert("end", f"[{timestamp}] ", "dim") self.log_text.insert("end", f"{message}\n", tag) self.log_text.see("end") self.log_text.configure(state="disabled") # Appel thread-safe if threading.current_thread() is threading.main_thread(): _append() else: self.after(0, _append) def _clear_log_panel(self): self.log_text.configure(state="normal") self.log_text.delete("1.0", "end") self.log_text.configure(state="disabled") def _handle_self_update(self, needs_update, info): """Gère le résultat de l'auto-update.""" if needs_update: answer = messagebox.askyesno( "Mise a jour du programme", f"Une mise a jour du programme est disponible !\n\n{info}\n\nMettre a jour maintenant ?", ) if answer: self.status_label.configure(text="Mise a jour du programme...") def work(): ok, msg = do_self_update() self.after(0, lambda: self._self_update_done(ok, msg)) threading.Thread(target=work, daemon=True).start() return self._start_check() def _self_update_done(self, ok, msg): """Callback après auto-update.""" if ok: messagebox.showinfo("Auto-update", msg) log.info("Auto-update appliquee, relance...") relaunch_program() self.destroy() return else: messagebox.showwarning("Auto-update", msg) self._start_check() def _start_check(self): """Lance la vérification dans un thread.""" self.btn_refresh.state(["disabled"]) self.btn_update_all.state(["disabled"]) self.status_label.configure(text="Verification en cours...") self._log_gui("Verification des depots...", "info") self.repos_config = load_repos() if not self.repos_config: self._clear_cards() self.status_label.configure(text="Aucun depot configure") self._show_no_repos() self.btn_refresh.state(["!disabled"]) return threading.Thread(target=self._check_all, daemon=True).start() def _check_all(self): """Vérifie tous les repos (dans un thread).""" results = [] for repo in self.repos_config: results.append(check_repo(repo)) self.repo_results = results self.after(0, self._display_results) def _display_results(self): """Affiche les résultats dans la GUI.""" self._clear_cards() has_any_updates = False for res in self.repo_results: card = self._create_card(res) card.pack(fill="x", pady=5) if not res["up_to_date"] and not res.get("error"): has_any_updates = True total = len(self.repo_results) up = sum(1 for r in self.repo_results if r["up_to_date"]) log.info(f"Resultat: {up}/{total} depots a jour") self._log_gui(f"Verification terminee : {up}/{total} depots a jour", "success" if up == total else "warning") self.status_label.configure(text=f"{up}/{total} depots a jour") self.btn_refresh.state(["!disabled"]) if has_any_updates: self.btn_update_all.state(["!disabled"]) def _clear_cards(self): for w in self.scroll_frame.winfo_children(): w.destroy() def _create_card(self, res): """Crée une carte pour un dépôt.""" bg_card = "#313244" fg = "#cdd6f4" green = "#a6e3a1" yellow = "#f9e2af" red = "#f38ba8" cyan = "#89dceb" dim = "#6c7086" card = tk.Frame(self.scroll_frame, bg=bg_card, bd=0, highlightthickness=1, highlightbackground="#45475a") # Header de la carte top = tk.Frame(card, bg=bg_card) top.pack(fill="x", padx=12, pady=(10, 5)) tk.Label(top, text=res["name"], bg=bg_card, fg=fg, font=("Segoe UI", 11, "bold")).pack(side="left") if res.get("error"): tk.Label(top, text="ERREUR", bg=bg_card, fg=red, font=("Segoe UI", 9, "bold")).pack(side="right") elif res.get("needs_clone"): tk.Label(top, text="A CLONER", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right") elif res["up_to_date"]: tk.Label(top, text="A JOUR", bg=bg_card, fg=green, font=("Segoe UI", 9, "bold")).pack(side="right") else: count = len(res["commits"]) + len(res["local_changes"]) tk.Label(top, text=f"MAJ DISPO ({count})", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right") # Infos info = tk.Frame(card, bg=bg_card) info.pack(fill="x", padx=12) tk.Label(info, text=f"Chemin : {res['relative_path']}", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w") if res["branch"]: tk.Label(info, text=f"Branche : {res['branch']} | {res['local_hash']}", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w") # Erreur if res.get("error"): tk.Label(card, text=f" {res['error']}", bg=bg_card, fg=red, font=("Segoe UI", 9), anchor="w").pack(fill="x", padx=12, pady=(5, 0)) # Commits distants if res["commits"]: tk.Label(card, text=f"Nouveaux commits ({len(res['commits'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(8, 2)) for c in res["commits"][:5]: line_frame = tk.Frame(card, bg=bg_card) line_frame.pack(fill="x", padx=20) tk.Label(line_frame, text=c["hash"], bg=bg_card, fg=cyan, font=("Consolas", 9)).pack(side="left") tk.Label(line_frame, text=f" {c['message']}", bg=bg_card, fg=fg, font=("Segoe UI", 9), anchor="w").pack(side="left", fill="x") if len(res["commits"]) > 5: tk.Label(card, text=f" ... et {len(res['commits']) - 5} autres commits", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20) # Fichiers distants modifiés if res["files"]: tk.Label(card, text=f"Fichiers distants modifies ({len(res['files'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(5, 2)) color_map = {"A": green, "M": yellow, "D": red, "R": cyan} for f in res["files"][:8]: line_frame = tk.Frame(card, bg=bg_card) line_frame.pack(fill="x", padx=20) c = color_map.get(f["status_char"], fg) tk.Label(line_frame, text=f"[{f['status']:>9}]", bg=bg_card, fg=c, font=("Consolas", 9)).pack(side="left") tk.Label(line_frame, text=f" {f['file']}", bg=bg_card, fg=fg, font=("Segoe UI", 9)).pack(side="left") if len(res["files"]) > 8: tk.Label(card, text=f" ... et {len(res['files']) - 8} autres fichiers", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20) # Fichiers locaux manquants if res["local_changes"]: tk.Label(card, text=f"Fichiers locaux a restaurer ({len(res['local_changes'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(5, 2)) for f in res["local_changes"][:8]: line_frame = tk.Frame(card, bg=bg_card) line_frame.pack(fill="x", padx=20) c = red if f["status_char"] == "D" else yellow tk.Label(line_frame, text=f"[{f['status']:>20}]", bg=bg_card, fg=c, font=("Consolas", 8)).pack(side="left") tk.Label(line_frame, text=f" {f['file']}", bg=bg_card, fg=fg, font=("Segoe UI", 9)).pack(side="left") if len(res["local_changes"]) > 8: tk.Label(card, text=f" ... et {len(res['local_changes']) - 8} autres fichiers", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20) # Bouton d'action if not res["up_to_date"] and not res.get("error"): btn_frame = tk.Frame(card, bg=bg_card) btn_frame.pack(fill="x", padx=12, pady=(8, 0)) if res.get("needs_clone"): btn = ttk.Button(btn_frame, text="Cloner le depot", command=lambda r=res: self._do_clone(r)) else: btn = ttk.Button(btn_frame, text="Mettre a jour", command=lambda r=res: self._do_update(r)) btn.pack(side="left") # Stocker ref du bouton pour le désactiver après res["_btn"] = btn # Padding bas tk.Frame(card, bg=bg_card, height=10).pack() return card def _do_update(self, res): """Met à jour un dépôt (pull + restore).""" name = res["name"] log.info(f"[{name}] MAJ unitaire demandee") self._log_gui(f"[{name}] Mise a jour en cours...", "info") if "_btn" in res: res["_btn"].state(["disabled"]) def work(): messages = [] success = True local_path = res["local_path"] branch = res["branch"] tag_map = {"A": "file_add", "M": "file_mod", "D": "file_del", "R": "file_mod"} if res["commits"]: log.info(f"[{name}] Pull de {len(res['commits'])} commit(s)...") self._log_gui(f"[{name}] Telechargement de {len(res['commits'])} commit(s)...", "info") ok, out, err = do_pull(local_path, branch) if ok: msg = f"Pull OK : {len(res['commits'])} commits telecharges." log.info(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "success") # Logger chaque fichier distant for f in res.get("files", []): tag = tag_map.get(f["status_char"], "info") self._log_gui(f" [{f['status']:>9}] {f['file']}", tag) messages.append(msg) else: msg = f"Erreur pull : {err}" log.error(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "error") messages.append(msg) success = False if res["local_changes"]: log.info(f"[{name}] Restauration de {len(res['local_changes'])} fichier(s)...") self._log_gui(f"[{name}] Restauration de {len(res['local_changes'])} fichier(s)...", "info") ok, err = do_restore(local_path) if ok: msg = f"Restauration OK : {len(res['local_changes'])} fichiers restaures." log.info(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "success") # Logger chaque fichier restauré for f in res["local_changes"]: tag = tag_map.get(f["status_char"], "info") self._log_gui(f" [Restaure] {f['file']}", tag) messages.append(msg) else: msg = f"Erreur restauration : {err}" log.error(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "error") messages.append(msg) success = False status = "SUCCES" if success else "ECHEC" log.info(f"[{name}] MAJ unitaire terminee - {status}") self._log_gui(f"[{name}] Termine - {status}", "success" if success else "error") self.after(0, lambda: self._show_update_result(res, messages, success)) threading.Thread(target=work, daemon=True).start() def _do_clone(self, res): """Clone un dépôt.""" name = res["name"] log.info(f"[{name}] Clonage demande - {res['url']}") self._log_gui(f"[{name}] Clonage en cours depuis {res['url']}...", "info") if "_btn" in res: res["_btn"].state(["disabled"]) repo = {"url": res["url"], "path": res["relative_path"]} def work(): ok, err = do_clone(repo) if ok: msg = f"Depot '{name}' clone avec succes !" log.info(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "success") else: msg = f"Erreur de clonage : {err}" log.error(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "error") self.after(0, lambda: messagebox.showinfo("Clonage", msg)) self.after(100, self._start_check) threading.Thread(target=work, daemon=True).start() def _show_update_result(self, res, messages, success): title = "Mise a jour" if success else "Erreur" messagebox.showinfo(title, f"{res['name']}\n\n" + "\n".join(messages)) self._start_check() def _update_all(self): """Met à jour tous les dépôts qui ont des MAJ.""" to_update = [r for r in self.repo_results if not r["up_to_date"] and not r.get("error") and not r.get("needs_clone")] log.info(f"MAJ globale demandee - {len(to_update)} depot(s) a mettre a jour") self._log_gui(f"MAJ globale : {len(to_update)} depot(s) a mettre a jour", "warning") for res in to_update: self._do_update(res) def _show_no_repos(self): bg_card = "#313244" fg = "#cdd6f4" dim = "#6c7086" card = tk.Frame(self.scroll_frame, bg=bg_card, bd=0, highlightthickness=1, highlightbackground="#45475a") card.pack(fill="x", pady=5) tk.Label(card, text="Aucun depot configure", bg=bg_card, fg=fg, font=("Segoe UI", 11, "bold")).pack(padx=12, pady=(12, 5)) tk.Label(card, text="Edite config.ini pour ajouter des depots :", bg=bg_card, fg=dim, font=("Segoe UI", 9)).pack(padx=12) example = ( "[repo:MonRepo]\n" "url = http://192.168.1.235:3125/user/repo\n" "path = ../MonRepo" ) tk.Label(card, text=example, bg="#1e1e2e", fg="#a6e3a1", font=("Consolas", 9), justify="left", padx=10, pady=8).pack(padx=12, pady=8, fill="x") tk.Frame(card, bg=bg_card, height=10).pack() def _open_config(self): config_path = str(get_config_path()) if sys.platform == "win32": os.startfile(config_path) def _open_log(self): log_dir = str(get_exe_dir() / "log") if sys.platform == "win32": os.startfile(log_dir) if __name__ == "__main__": app = App() app.mainloop()