""" Git Update Checker - GUI multi-repo. Vérifie les mises à jour de plusieurs dépôts Git et propose de les télécharger. Accès lecture seule uniquement (fetch/pull/checkout, jamais de push). Tous les chemins sont relatifs à l'emplacement de l'exécutable. """ VERSION = "0.5.8" import subprocess import sys import os import configparser import logging import threading from concurrent.futures import ThreadPoolExecutor, as_completed import tkinter as tk from tkinter import ttk, messagebox from pathlib import Path from datetime import datetime import urllib.request import urllib.error # Forcer UTF-8 sur Windows if sys.platform == "win32": os.system("chcp 65001 >nul 2>&1") try: sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") except Exception: pass # ── Utilitaires ────────────────────────────────────────────────────────────── def get_exe_dir(): if getattr(sys, "frozen", False): return Path(sys.executable).parent return Path(__file__).parent def resolve_relative(path_str): """Résout un chemin relatif par rapport au dossier de l'exe.""" p = Path(path_str) if not p.is_absolute(): p = get_exe_dir() / p return p.resolve() # ── Logging ────────────────────────────────────────────────────────────────── def setup_logging(): """Configure le logging dans un dossier log/ à côté de l'exe.""" log_dir = get_exe_dir() / "log" log_dir.mkdir(exist_ok=True) log_file = log_dir / f"{datetime.now().strftime('%Y-%m-%d')}.log" logger = logging.getLogger("GitUpdateChecker") logger.setLevel(logging.DEBUG) # Handler fichier fh = logging.FileHandler(log_file, encoding="utf-8") fh.setLevel(logging.DEBUG) fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")) logger.addHandler(fh) # Nettoyage des vieux logs (garder 30 jours) for old_log in sorted(log_dir.glob("*.log"))[:-30]: try: old_log.unlink() except OSError: pass return logger log = setup_logging() def run_git(args, cwd=None, timeout=30): # -c safe.directory=* : évite l'erreur "dubious ownership" sur clé USB cmd = ["git", "-c", "safe.directory=*"] + args log.debug(f"git {' '.join(args)} (cwd={cwd})") try: result = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, ) if result.returncode != 0 and result.stderr.strip(): log.warning(f"git {args[0]} erreur: {result.stderr.strip()}") return result.returncode, result.stdout.strip(), result.stderr.strip() except FileNotFoundError: log.error("Git non trouve dans le PATH") return -1, "", "Git n'est pas installe ou pas dans le PATH." except subprocess.TimeoutExpired: log.error(f"Timeout: git {' '.join(args)}") return 1, "", "Timeout" # ── Auto-update du programme ───────────────────────────────────────────────── def _version_tuple(v): """Convertit '0.4' en (0, 4) pour comparaison.""" try: return tuple(int(x) for x in v.strip().split(".")) except (ValueError, AttributeError): return (0,) def _get_self_update_config(): """Lit la config [self-update] depuis config.ini. Retourne (url, exe_name, branch) ou (None, None, None).""" config_path = get_config_path() if not config_path.exists(): return None, None, None config = configparser.ConfigParser() config.read(config_path, encoding="utf-8") if not config.has_section("self-update"): return None, None, None url = config.get("self-update", "url", fallback="").strip().rstrip("/") exe_name = config.get("self-update", "exe_name", fallback="GitUpdateChecker.exe").strip() branch = config.get("self-update", "branch", fallback="master").strip() if not url: return None, None, None return url, exe_name, branch def check_self_update(): """ Vérifie si une nouvelle version est disponible sur le serveur Gitea. Télécharge version.txt via HTTP et compare avec VERSION locale. Retourne (needs_update: bool, info: str). """ repo_url, _, branch = _get_self_update_config() if not repo_url: log.info("Auto-update: pas de section [self-update] dans config.ini, skip") return False, "" log.info("Auto-update: verification via HTTP...") version_url = f"{repo_url}/raw/branch/{branch}/version.txt" try: req = urllib.request.Request(version_url, headers={"User-Agent": "GitUpdateChecker"}) with urllib.request.urlopen(req, timeout=10) as resp: remote_version = resp.read().decode("utf-8").strip() except (urllib.error.URLError, OSError) as e: log.warning(f"Auto-update: impossible de verifier la version distante: {e}") return False, f"Impossible de contacter le serveur: {e}" log.info(f"Auto-update: version locale={VERSION} distante={remote_version}") if _version_tuple(remote_version) <= _version_tuple(VERSION): log.info("Auto-update: programme a jour") return False, "" info = f"Version actuelle : {VERSION}\nVersion disponible : {remote_version}" log.info(f"Auto-update: MAJ disponible - {remote_version}") return True, info def do_self_update(): """ Télécharge le nouvel exe depuis le serveur Gitea. Stratégie : télécharger dans .new, renommer l'exe actuel en .old, placer le nouveau. Retourne (ok, message). """ repo_url, exe_name, branch = _get_self_update_config() if not repo_url: return False, "Configuration [self-update] manquante" is_frozen = getattr(sys, "frozen", False) if not is_frozen: log.warning("Auto-update: mode script, telechargement non supporte") return False, "Auto-update uniquement supporte en mode .exe" exe_path = Path(sys.executable) exe_old_path = exe_path.with_suffix(".exe.old") exe_new_path = exe_path.with_suffix(".exe.new") # Telecharger le nouvel exe download_url = f"{repo_url}/raw/branch/{branch}/{exe_name}" log.info(f"Auto-update: telechargement de {download_url}") try: req = urllib.request.Request(download_url, headers={"User-Agent": "GitUpdateChecker"}) total_bytes = 0 with urllib.request.urlopen(req, timeout=60) as resp, open(exe_new_path, "wb") as f: while True: chunk = resp.read(65536) # 64 Ko par bloc if not chunk: break f.write(chunk) total_bytes += len(chunk) if total_bytes < 1000: log.error(f"Auto-update: fichier telecharge trop petit ({total_bytes} octets)") exe_new_path.unlink(missing_ok=True) return False, "Le fichier telecharge semble invalide" log.info(f"Auto-update: telecharge {total_bytes} octets -> {exe_new_path.name}") except (urllib.error.URLError, OSError) as e: log.error(f"Auto-update: echec telechargement: {e}") if exe_new_path.exists(): try: exe_new_path.unlink() except OSError: pass return False, f"Erreur telechargement: {e}" # Renommer : exe actuel -> .old try: if exe_old_path.exists(): exe_old_path.unlink() exe_path.rename(exe_old_path) log.info(f"Auto-update: {exe_path.name} -> {exe_old_path.name}") except OSError as e: log.error(f"Auto-update: impossible de renommer l'exe: {e}") if exe_new_path.exists(): try: exe_new_path.unlink() except OSError: pass return False, f"Impossible de renommer l'exe: {e}" # Renommer : .new -> exe try: exe_new_path.rename(exe_path) log.info(f"Auto-update: {exe_new_path.name} -> {exe_path.name}") except OSError as e: log.error(f"Auto-update: impossible de placer le nouvel exe: {e}") try: exe_old_path.rename(exe_path) log.info("Auto-update: ancien exe restaure") except OSError: pass return False, f"Impossible de placer le nouvel exe: {e}" return True, "Mise a jour reussie !\nLe programme va redemarrer." def relaunch_program(): """Relance le programme (nouvel exe) et quitte le processus actuel.""" if getattr(sys, "frozen", False): exe_path = str(Path(sys.executable)) log.info(f"Auto-update: relance de {exe_path}") # Lancer un batch qui attend 1s puis lance le nouvel exe et supprime l'ancien .old bat_path = str(get_exe_dir() / "_update.bat") bat_content = ( f'@echo off\n' f'timeout /t 1 /nobreak >nul\n' f'start "" "{exe_path}"\n' f'del "{exe_path}.old" 2>nul\n' f'del "%~f0"\n' ) with open(bat_path, "w", encoding="mbcs") as f: f.write(bat_content) subprocess.Popen( ["cmd", "/c", bat_path], creationflags=subprocess.CREATE_NO_WINDOW, ) else: # Mode script : relancer python log.info("Auto-update: relance du script") subprocess.Popen([sys.executable, __file__]) # ── Configuration ──────────────────────────────────────────────────────────── def get_config_path(): return get_exe_dir() / "config.ini" def load_repos(): """Charge la liste des dépôts depuis config.ini.""" config_path = get_config_path() config = configparser.ConfigParser() repos = [] if not config_path.exists(): # Créer un config.ini exemple config["repo:Exemple"] = { "url": "http://192.168.1.235:3125/user/repo", "path": "../MonRepo", } with open(config_path, "w", encoding="utf-8") as f: f.write("; Configuration des depots Git a surveiller\n") f.write("; Les chemins (path) sont relatifs a l'emplacement de l'exe\n") f.write("; Ajouter autant de sections [repo:NomDuRepo] que necessaire\n\n") config.write(f) return repos config.read(config_path, encoding="utf-8") for section in config.sections(): if section.startswith("repo:"): name = section[5:] url = config.get(section, "url", fallback="").strip() path = config.get(section, "path", fallback="").strip() if url and path: repos.append({"name": name, "url": url, "path": path}) return repos # ── Logique Git ────────────────────────────────────────────────────────────── def check_repo(repo): """Vérifie un dépôt et retourne son état.""" name = repo["name"] url = repo["url"] local_path = str(resolve_relative(repo["path"])) log.info(f"[{name}] Verification - {url} -> {local_path}") result = { "name": name, "url": url, "local_path": local_path, "relative_path": repo["path"], "exists": False, "up_to_date": False, "error": None, "commits": [], "files": [], "local_changes": [], "local_only": False, "branch": "", "local_hash": "", "remote_hash": "", "needs_clone": False, "offline": False, } # Vérifier si le dossier existe et est un repo git if not os.path.isdir(os.path.join(local_path, ".git")): if os.path.isdir(local_path) and os.listdir(local_path): result["error"] = "Le dossier existe mais n'est pas un depot git." return result result["needs_clone"] = True return result result["exists"] = True # Branche courante code, branch, _ = run_git(["rev-parse", "--abbrev-ref", "HEAD"], cwd=local_path) if code != 0: result["error"] = "Impossible de lire la branche courante." return result result["branch"] = branch # Mettre à jour l'URL origin si elle a changé dans config.ini run_git(["remote", "set-url", "origin", url], cwd=local_path) # Fetch (détecte aussi si le remote est inaccessible) code, _, err = run_git(["fetch", "origin"], cwd=local_path) if code != 0: offline_keywords = ["could not resolve", "connection refused", "unable to connect", "timed out", "the remote end hung up"] if any(kw in err.lower() for kw in offline_keywords): log.warning(f"[{name}] Remote inaccessible: {url}") result["error"] = "Depot hors ligne (remote inaccessible)" result["offline"] = True else: result["error"] = f"Erreur fetch : {err}" return result # Comparer HEAD local vs origin code, local_hash, _ = run_git(["rev-parse", "HEAD"], cwd=local_path) if code != 0: result["error"] = "Impossible de lire le commit local." return result code, remote_hash, _ = run_git(["rev-parse", f"origin/{branch}"], cwd=local_path) if code != 0: result["error"] = f"Impossible de trouver origin/{branch}." return result result["local_hash"] = local_hash[:8] result["remote_hash"] = remote_hash[:8] # Modifications locales code, local_diff, _ = run_git(["diff", "--name-status", "HEAD"], cwd=local_path) if code == 0 and local_diff: for line in local_diff.splitlines(): parts = line.split("\t", 1) if len(parts) == 2: sc = parts[0].strip() fn = parts[1].strip() status_map = {"D": "Supprime localement", "M": "Modifie localement"} result["local_changes"].append({ "status": status_map.get(sc[0], sc), "status_char": sc[0], "file": fn, }) if local_hash == remote_hash and not result["local_changes"]: log.info(f"[{name}] A jour ({local_hash[:8]})") result["up_to_date"] = True return result if local_hash == remote_hash: log.info(f"[{name}] {len(result['local_changes'])} fichier(s) locaux modifies/supprimes") result["local_only"] = True return result # Nouveaux commits distants code, log_out, _ = run_git( ["log", "--oneline", "--format=%h|%an|%ar|%s", f"HEAD..origin/{branch}"], cwd=local_path, ) if code == 0 and log_out: for line in log_out.splitlines(): parts = line.split("|", 3) if len(parts) == 4: result["commits"].append({ "hash": parts[0], "author": parts[1], "date": parts[2], "message": parts[3], }) # Fichiers distants modifiés code, diff_out, _ = run_git( ["diff", "--name-status", f"HEAD..origin/{branch}"], cwd=local_path, ) if code == 0 and diff_out: for line in diff_out.splitlines(): parts = line.split("\t", 1) if len(parts) == 2: sc = parts[0].strip() fn = parts[1].strip() status_map = {"A": "Ajoute", "M": "Modifie", "D": "Supprime", "R": "Renomme"} result["files"].append({ "status": status_map.get(sc[0], sc), "status_char": sc[0], "file": fn, }) return result def do_clone(repo): """Clone un dépôt.""" local_path = str(resolve_relative(repo["path"])) log.info(f"Clonage: {repo['url']} -> {local_path}") code, _, err = run_git(["clone", repo["url"], local_path], timeout=300) if code == 0: log.info(f"Clonage reussi: {local_path}") else: log.error(f"Clonage echoue: {err}") return code == 0, err def do_pull(local_path, branch): """Pull les mises à jour (lecture seule).""" log.info(f"Pull: {local_path} (branche {branch})") code, out, err = run_git(["pull", "origin", branch], cwd=local_path, timeout=120) if code == 0: log.info(f"Pull reussi: {local_path}") else: log.error(f"Pull echoue: {err}") return code == 0, out, err def do_restore(local_path): """Restaure les fichiers locaux modifiés/supprimés.""" log.info(f"Restauration: {local_path}") code, _, err = run_git(["checkout", "--", "."], cwd=local_path) if code == 0: log.info(f"Restauration reussie: {local_path}") else: log.error(f"Restauration echouee: {err}") return code == 0, err # ── Interface graphique ────────────────────────────────────────────────────── class App(tk.Tk): def __init__(self): super().__init__() self.title(f"Git Update Checker v{VERSION}") self.geometry("820x600") self.minsize(700, 450) self.configure(bg="#1e1e2e") self.repos_config = load_repos() self.repo_results = [] # Icône de la fenêtre (barre de titre + taskbar) self._app_icon = None icon_path = get_exe_dir() / "icon.png" if icon_path.exists(): try: self._app_icon = tk.PhotoImage(file=str(icon_path)) self.iconphoto(True, self._app_icon) except Exception: self._app_icon = None log.info(f"=== Demarrage Git Update Checker v{VERSION} ===") self._cleanup_old_exe() self._build_ui() self.after(100, self._check_self_update_then_repos) def _cleanup_old_exe(self): """Supprime l'ancien exe .old restant d'une mise a jour precedente.""" if getattr(sys, "frozen", False): old_path = Path(sys.executable).with_suffix(".exe.old") if old_path.exists(): try: old_path.unlink() log.info(f"Nettoyage: {old_path.name} supprime") except OSError: log.warning(f"Nettoyage: impossible de supprimer {old_path.name}") def _build_ui(self): style = ttk.Style(self) style.theme_use("clam") # Couleurs sombres bg = "#1e1e2e" fg = "#cdd6f4" accent = "#89b4fa" green = "#a6e3a1" yellow = "#f9e2af" red = "#f38ba8" style.configure("TFrame", background=bg) style.configure("TLabel", background=bg, foreground=fg, font=("Segoe UI", 10)) style.configure("Title.TLabel", background=bg, foreground=fg, font=("Segoe UI", 14, "bold")) style.configure("Status.TLabel", background=bg, foreground=accent, font=("Segoe UI", 9)) style.configure("TButton", font=("Segoe UI", 10)) style.configure("Green.TLabel", background=bg, foreground=green, font=("Segoe UI", 10, "bold")) style.configure("Yellow.TLabel", background=bg, foreground=yellow, font=("Segoe UI", 10, "bold")) style.configure("Red.TLabel", background=bg, foreground=red, font=("Segoe UI", 10, "bold")) # Header header = ttk.Frame(self) header.pack(fill="x", padx=15, pady=(15, 5)) # Icône dans le coin haut gauche (redimensionnée à 32x32) if self._app_icon: try: icon_small = tk.PhotoImage(file=str(get_exe_dir() / "icon.png")).subsample( max(1, self._app_icon.width() // 32), max(1, self._app_icon.height() // 32), ) self._icon_small = icon_small # garder la référence ttk.Label(header, image=icon_small, style="TLabel").pack(side="left", padx=(0, 8)) except Exception: pass ttk.Label(header, text=f"Git Update Checker v{VERSION}", style="Title.TLabel").pack(side="left") self.status_label = ttk.Label(header, text="Verification en cours...", style="Status.TLabel") self.status_label.pack(side="right") # Date (mise a jour chaque seconde) self.date_label = ttk.Label(self, text="", style="Status.TLabel") self.date_label.pack(anchor="w", padx=15) self._tick_clock() # Panneau principal (PanedWindow vertical : cartes en haut, log en bas) paned = ttk.PanedWindow(self, orient="vertical") paned.pack(fill="both", expand=True, padx=15, pady=10) # Zone scrollable pour les repos container = ttk.Frame(paned) paned.add(container, weight=3) self.canvas = tk.Canvas(container, bg=bg, highlightthickness=0) scrollbar = ttk.Scrollbar(container, orient="vertical", command=self.canvas.yview) self.scroll_frame = ttk.Frame(self.canvas) self.scroll_frame.bind("", lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))) self.canvas.create_window((0, 0), window=self.scroll_frame, anchor="nw") self.canvas.configure(yscrollcommand=scrollbar.set) self.canvas.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Scroll avec la molette : cible le bon widget selon la position du curseur self.bind_all("", self._on_mousewheel) # Panneau de log en bas log_frame = tk.Frame(paned, bg="#181825") paned.add(log_frame, weight=1) log_header = tk.Frame(log_frame, bg="#181825") log_header.pack(fill="x", padx=8, pady=(6, 2)) tk.Label(log_header, text="Journal des operations", bg="#181825", fg="#a6adc8", font=("Segoe UI", 9, "bold")).pack(side="left") tk.Button(log_header, text="Effacer", bg="#313244", fg="#cdd6f4", bd=0, font=("Segoe UI", 8), command=self._clear_log_panel, activebackground="#45475a", activeforeground="#cdd6f4").pack(side="right") self.log_text = tk.Text(log_frame, bg="#11111b", fg="#cdd6f4", font=("Consolas", 9), height=8, bd=0, highlightthickness=0, wrap="word", state="disabled", padx=8, pady=4) log_scroll = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview) self.log_text.configure(yscrollcommand=log_scroll.set) self.log_text.pack(side="left", fill="both", expand=True, padx=(8, 0), pady=(0, 8)) log_scroll.pack(side="right", fill="y", padx=(0, 8), pady=(0, 8)) # Tags couleur pour le log GUI self.log_text.tag_configure("info", foreground="#cdd6f4") self.log_text.tag_configure("success", foreground="#a6e3a1") self.log_text.tag_configure("warning", foreground="#f9e2af") self.log_text.tag_configure("error", foreground="#f38ba8") self.log_text.tag_configure("file_add", foreground="#a6e3a1") self.log_text.tag_configure("file_mod", foreground="#f9e2af") self.log_text.tag_configure("file_del", foreground="#f38ba8") self.log_text.tag_configure("dim", foreground="#6c7086") # Boutons en bas btn_frame = ttk.Frame(self) btn_frame.pack(fill="x", padx=15, pady=(0, 15)) self.btn_refresh = ttk.Button(btn_frame, text="Rafraichir", command=self._start_check) self.btn_refresh.pack(side="left", padx=(0, 10)) self.btn_update_all = ttk.Button(btn_frame, text="Tout mettre a jour", command=self._update_all) self.btn_update_all.pack(side="left") self.btn_update_all.state(["disabled"]) ttk.Button(btn_frame, text="Ouvrir config.ini", command=self._open_config).pack(side="right") self.btn_open_log = ttk.Button(btn_frame, text="Ouvrir les logs", command=self._open_log) self.btn_open_log.pack(side="right", padx=(0, 10)) def _tick_clock(self): """Met à jour le label de date chaque seconde.""" self.date_label.configure(text=datetime.now().strftime(" %d/%m/%Y %H:%M:%S")) self.after(1000, self._tick_clock) def _on_mousewheel(self, event): """Scroll le bon widget selon où se trouve la souris.""" w = event.widget while w is not None: if w is self.canvas or w is self.scroll_frame: self.canvas.yview_scroll(int(-1 * (event.delta / 120)), "units") return if w is self.log_text: self.log_text.yview_scroll(int(-1 * (event.delta / 120)), "units") return try: w = w.master except AttributeError: break def _check_self_update_then_repos(self): """Vérifie d'abord la MAJ du programme, puis les repos.""" self.status_label.configure(text="Verification auto-update...") def work(): needs, info = check_self_update() self.after(0, lambda: self._handle_self_update(needs, info)) threading.Thread(target=work, daemon=True).start() def _log_gui(self, message, tag="info"): """Ajoute une ligne dans le panneau de log.""" def _append(): self.log_text.configure(state="normal") timestamp = datetime.now().strftime("%H:%M:%S") self.log_text.insert("end", f"[{timestamp}] ", "dim") self.log_text.insert("end", f"{message}\n", tag) self.log_text.see("end") self.log_text.configure(state="disabled") # Appel thread-safe if threading.current_thread() is threading.main_thread(): _append() else: self.after(0, _append) def _clear_log_panel(self): self.log_text.configure(state="normal") self.log_text.delete("1.0", "end") self.log_text.configure(state="disabled") def _handle_self_update(self, needs_update, info): """Gère le résultat de l'auto-update.""" if needs_update: answer = messagebox.askyesno( "Mise a jour du programme", f"Une mise a jour du programme est disponible !\n\n{info}\n\nMettre a jour maintenant ?", ) if answer: self.status_label.configure(text="Mise a jour du programme...") def work(): ok, msg = do_self_update() self.after(0, lambda: self._self_update_done(ok, msg)) threading.Thread(target=work, daemon=True).start() return self._start_check() def _self_update_done(self, ok, msg): """Callback après auto-update.""" if ok: messagebox.showinfo("Auto-update", msg) log.info("Auto-update appliquee, relance...") relaunch_program() self.destroy() return else: messagebox.showwarning("Auto-update", msg) self._start_check() def _start_check(self): """Lance la vérification dans un thread.""" self.btn_refresh.state(["disabled"]) self.btn_update_all.state(["disabled"]) self.status_label.configure(text="Verification en cours...") self._log_gui("Verification des depots...", "info") self.repos_config = load_repos() if not self.repos_config: self._clear_cards() self.status_label.configure(text="Aucun depot configure") self._show_no_repos() self.btn_refresh.state(["!disabled"]) return threading.Thread(target=self._check_all, daemon=True).start() def _check_all(self): """Vérifie tous les repos en parallèle (max 4 à la fois).""" total = len(self.repos_config) results = [None] * total done = 0 with ThreadPoolExecutor(max_workers=4) as executor: futures = {} for i, repo in enumerate(self.repos_config): self._log_gui(f"[{repo['name']}] Verification...", "dim") futures[executor.submit(check_repo, repo)] = i for future in as_completed(futures): idx = futures[future] res = future.result() results[idx] = res done += 1 # Résumé concis selon le statut if res.get("offline"): self._log_gui(f"[{res['name']}] Hors ligne", "warning") elif res.get("error"): self._log_gui(f"[{res['name']}] Erreur : {res['error']}", "error") elif res.get("needs_clone"): self._log_gui(f"[{res['name']}] A cloner", "warning") elif res["up_to_date"]: self._log_gui(f"[{res['name']}] A jour", "success") else: count = len(res["commits"]) + len(res["local_changes"]) self._log_gui(f"[{res['name']}] {count} changement(s) disponible(s)", "warning") self.after(0, lambda d=done, t=total: self.status_label.configure(text=f"Verification {d}/{t}...")) self.repo_results = results self.after(0, self._display_results) def _display_results(self): """Affiche les résultats dans la GUI.""" self._clear_cards() has_any_updates = False for res in self.repo_results: card = self._create_card(res) card.pack(fill="x", pady=5) if not res["up_to_date"] and not res.get("error") and not res.get("offline") and not res.get("needs_clone"): has_any_updates = True total = len(self.repo_results) up = sum(1 for r in self.repo_results if r["up_to_date"]) log.info(f"Resultat: {up}/{total} depots a jour") self._log_gui(f"Verification terminee : {up}/{total} depots a jour", "success" if up == total else "warning") self.status_label.configure(text=f"{up}/{total} depots a jour") self.btn_refresh.state(["!disabled"]) if has_any_updates: self.btn_update_all.state(["!disabled"]) def _clear_cards(self): for w in self.scroll_frame.winfo_children(): w.destroy() def _create_card(self, res): """Crée une carte pour un dépôt.""" bg_card = "#313244" fg = "#cdd6f4" green = "#a6e3a1" yellow = "#f9e2af" red = "#f38ba8" cyan = "#89dceb" dim = "#6c7086" card = tk.Frame(self.scroll_frame, bg=bg_card, bd=0, highlightthickness=1, highlightbackground="#45475a") # Header de la carte top = tk.Frame(card, bg=bg_card) top.pack(fill="x", padx=12, pady=(10, 5)) tk.Label(top, text=res["name"], bg=bg_card, fg=fg, font=("Segoe UI", 11, "bold")).pack(side="left") if res.get("offline"): tk.Label(top, text="HORS LIGNE", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right") elif res.get("error"): tk.Label(top, text="ERREUR", bg=bg_card, fg=red, font=("Segoe UI", 9, "bold")).pack(side="right") elif res.get("needs_clone"): tk.Label(top, text="A CLONER", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right") elif res["up_to_date"]: tk.Label(top, text="A JOUR", bg=bg_card, fg=green, font=("Segoe UI", 9, "bold")).pack(side="right") else: count = len(res["commits"]) + len(res["local_changes"]) tk.Label(top, text=f"MAJ DISPO ({count})", bg=bg_card, fg=yellow, font=("Segoe UI", 9, "bold")).pack(side="right") # Infos info = tk.Frame(card, bg=bg_card) info.pack(fill="x", padx=12) tk.Label(info, text=f"Chemin : {res['relative_path']}", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w") if res["branch"]: tk.Label(info, text=f"Branche : {res['branch']} | {res['local_hash']}", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w") # Erreur if res.get("error"): tk.Label(card, text=f" {res['error']}", bg=bg_card, fg=red, font=("Segoe UI", 9), anchor="w").pack(fill="x", padx=12, pady=(5, 0)) # Commits distants if res["commits"]: tk.Label(card, text=f"Nouveaux commits ({len(res['commits'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(8, 2)) for c in res["commits"][:5]: line_frame = tk.Frame(card, bg=bg_card) line_frame.pack(fill="x", padx=20) tk.Label(line_frame, text=c["hash"], bg=bg_card, fg=cyan, font=("Consolas", 9)).pack(side="left") tk.Label(line_frame, text=f" {c['message']}", bg=bg_card, fg=fg, font=("Segoe UI", 9), anchor="w").pack(side="left", fill="x") if len(res["commits"]) > 5: tk.Label(card, text=f" ... et {len(res['commits']) - 5} autres commits", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20) # Fichiers distants modifiés if res["files"]: tk.Label(card, text=f"Fichiers distants modifies ({len(res['files'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(5, 2)) color_map = {"A": green, "M": yellow, "D": red, "R": cyan} for f in res["files"][:8]: line_frame = tk.Frame(card, bg=bg_card) line_frame.pack(fill="x", padx=20) c = color_map.get(f["status_char"], fg) tk.Label(line_frame, text=f"[{f['status']:>9}]", bg=bg_card, fg=c, font=("Consolas", 9)).pack(side="left") tk.Label(line_frame, text=f" {f['file']}", bg=bg_card, fg=fg, font=("Segoe UI", 9)).pack(side="left") if len(res["files"]) > 8: tk.Label(card, text=f" ... et {len(res['files']) - 8} autres fichiers", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20) # Fichiers locaux manquants if res["local_changes"]: tk.Label(card, text=f"Fichiers locaux a restaurer ({len(res['local_changes'])}) :", bg=bg_card, fg=fg, font=("Segoe UI", 9, "bold"), anchor="w").pack(fill="x", padx=12, pady=(5, 2)) for f in res["local_changes"][:8]: line_frame = tk.Frame(card, bg=bg_card) line_frame.pack(fill="x", padx=20) c = red if f["status_char"] == "D" else yellow tk.Label(line_frame, text=f"[{f['status']:>20}]", bg=bg_card, fg=c, font=("Consolas", 8)).pack(side="left") tk.Label(line_frame, text=f" {f['file']}", bg=bg_card, fg=fg, font=("Segoe UI", 9)).pack(side="left") if len(res["local_changes"]) > 8: tk.Label(card, text=f" ... et {len(res['local_changes']) - 8} autres fichiers", bg=bg_card, fg=dim, font=("Segoe UI", 8)).pack(anchor="w", padx=20) # Bouton d'action if not res["up_to_date"] and not res.get("error"): btn_frame = tk.Frame(card, bg=bg_card) btn_frame.pack(fill="x", padx=12, pady=(8, 0)) if res.get("needs_clone"): btn = ttk.Button(btn_frame, text="Cloner le depot", command=lambda r=res: self._do_clone(r)) else: btn = ttk.Button(btn_frame, text="Mettre a jour", command=lambda r=res: self._do_update(r)) btn.pack(side="left") # Stocker ref du bouton pour le désactiver après res["_btn"] = btn # Padding bas tk.Frame(card, bg=bg_card, height=10).pack() return card def _do_update(self, res, batch=False): """Met à jour un dépôt (pull + restore).""" name = res["name"] log.info(f"[{name}] MAJ unitaire demandee") self._log_gui(f"[{name}] Mise a jour en cours...", "info") if "_btn" in res: res["_btn"].state(["disabled"]) def work(): messages = [] success = True local_path = res["local_path"] branch = res["branch"] tag_map = {"A": "file_add", "M": "file_mod", "D": "file_del", "R": "file_mod"} if res["commits"]: log.info(f"[{name}] Pull de {len(res['commits'])} commit(s)...") self._log_gui(f"[{name}] Telechargement de {len(res['commits'])} commit(s)...", "info") ok, out, err = do_pull(local_path, branch) if ok: msg = f"Pull OK : {len(res['commits'])} commits telecharges." log.info(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "success") # Logger chaque fichier distant for f in res.get("files", []): tag = tag_map.get(f["status_char"], "info") self._log_gui(f" [{f['status']:>9}] {f['file']}", tag) messages.append(msg) else: msg = f"Erreur pull : {err}" log.error(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "error") messages.append(msg) success = False if res["local_changes"]: log.info(f"[{name}] Restauration de {len(res['local_changes'])} fichier(s)...") self._log_gui(f"[{name}] Restauration de {len(res['local_changes'])} fichier(s)...", "info") ok, err = do_restore(local_path) if ok: msg = f"Restauration OK : {len(res['local_changes'])} fichiers restaures." log.info(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "success") # Logger chaque fichier restauré for f in res["local_changes"]: tag = tag_map.get(f["status_char"], "info") self._log_gui(f" [Restaure] {f['file']}", tag) messages.append(msg) else: msg = f"Erreur restauration : {err}" log.error(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "error") messages.append(msg) success = False status = "SUCCES" if success else "ECHEC" log.info(f"[{name}] MAJ unitaire terminee - {status}") self._log_gui(f"[{name}] Termine - {status}", "success" if success else "error") self.after(0, lambda: self._show_update_result(res, messages, success, batch=batch)) threading.Thread(target=work, daemon=True).start() def _do_clone(self, res): """Clone un dépôt.""" name = res["name"] log.info(f"[{name}] Clonage demande - {res['url']}") self._log_gui(f"[{name}] Clonage en cours depuis {res['url']}...", "info") if "_btn" in res: res["_btn"].state(["disabled"]) repo = {"url": res["url"], "path": res["relative_path"]} def work(): ok, err = do_clone(repo) if ok: msg = f"Depot '{name}' clone avec succes !" log.info(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "success") else: msg = f"Erreur de clonage : {err}" log.error(f"[{name}] {msg}") self._log_gui(f"[{name}] {msg}", "error") self.after(0, lambda: messagebox.showinfo("Clonage", msg)) self.after(100, self._start_check) threading.Thread(target=work, daemon=True).start() def _show_update_result(self, res, messages, success, batch=False): if batch: # En mode batch : pas de messagebox individuelle, on ne rafraichit qu'une fois à la fin self._batch_remaining -= 1 if self._batch_remaining == 0: self._start_check() else: title = "Mise a jour" if success else "Erreur" messagebox.showinfo(title, f"{res['name']}\n\n" + "\n".join(messages)) self._start_check() def _update_all(self): """Met à jour tous les dépôts qui ont des MAJ.""" to_update = [r for r in self.repo_results if not r["up_to_date"] and not r.get("error") and not r.get("needs_clone")] if not to_update: return log.info(f"MAJ globale demandee - {len(to_update)} depot(s) a mettre a jour") self._log_gui(f"MAJ globale : {len(to_update)} depot(s) a mettre a jour", "warning") # Compteur pour n'appeler _start_check qu'une seule fois quand tous sont termines self._batch_remaining = len(to_update) for res in to_update: self._do_update(res, batch=True) def _show_no_repos(self): bg_card = "#313244" fg = "#cdd6f4" dim = "#6c7086" card = tk.Frame(self.scroll_frame, bg=bg_card, bd=0, highlightthickness=1, highlightbackground="#45475a") card.pack(fill="x", pady=5) tk.Label(card, text="Aucun depot configure", bg=bg_card, fg=fg, font=("Segoe UI", 11, "bold")).pack(padx=12, pady=(12, 5)) tk.Label(card, text="Edite config.ini pour ajouter des depots :", bg=bg_card, fg=dim, font=("Segoe UI", 9)).pack(padx=12) example = ( "[repo:MonRepo]\n" "url = http://192.168.1.235:3125/user/repo\n" "path = ../MonRepo" ) tk.Label(card, text=example, bg="#1e1e2e", fg="#a6e3a1", font=("Consolas", 9), justify="left", padx=10, pady=8).pack(padx=12, pady=8, fill="x") tk.Frame(card, bg=bg_card, height=10).pack() def _open_config(self): config_path = str(get_config_path()) if sys.platform == "win32": os.startfile(config_path) def _open_log(self): log_dir = str(get_exe_dir() / "log") if sys.platform == "win32": os.startfile(log_dir) if __name__ == "__main__": app = App() app.mainloop()