# visualiseur_logs.py # Dépendances: streamlit, paramiko # pip install streamlit paramiko import html import time from datetime import datetime import streamlit as st try: import paramiko except ImportError: paramiko = None # ========================= # CONFIG — À RENSEIGNER # ========================= VPS_HOST = "162.19.78.131" # ← mets ton IP/DNS VPS_PORT = 22 # ← port SSH VPS_USER = "debian" # ← utilisateur VPS_PASSWORD = "lpZwixbBUFtGY" # ← mot de passe VPS_LOG_DIR = "/home/debian/Gestion_sondes/Logs" # ← dossier des logs sur le VPS # ========================= # UI # ========================= st.set_page_config(page_title="Visualiseur de Logs (VPS, password)", layout="wide") st.title("🧾 Visualiseur de fichiers logs (VPS)") st.caption(f"Cible : {VPS_USER}@{VPS_HOST}:{VPS_PORT} • Dossier logs : {VPS_LOG_DIR}") if paramiko is None: st.error("Paramiko n’est pas installé. Exécute : pip install paramiko") st.stop() # Barre latérale : options d’affichage & refresh with st.sidebar: st.header("⚙️ Options") auto_refresh = st.toggle("🔄 Rafraîchissement auto", value=False, key="auto_refresh") refresh_interval = st.slider("Intervalle (secondes)", 2, 60, 5, key="refresh_interval") if st.button("Rafraîchir maintenant"): st.rerun() # ========================= # FONCTIONS SSH # ========================= def ssh_connect_password(host, port, user, password): """Retourne un client SSH connecté (password).""" ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=host, port=int(port), username=user, password=password, timeout=10) return ssh def list_logs_over_ssh(ssh, log_dir): """ Liste les *.log du dossier (sans sous-dossiers), triés par mtime desc. Retourne [{'name': 'f.log', 'mtime': 1234567890.0, 'size': 1024}, ...] """ cmd = ( f"bash -lc \"find '{log_dir}' -maxdepth 1 -type f -name '*.log' " f"-printf '%T@ %s %f\\n' 2>/dev/null | sort -nr\"" ) _, stdout, stderr = ssh.exec_command(cmd) _ = stderr.read().decode(errors="ignore") # on ignore les warnings find out = stdout.read().decode(errors="ignore") items = [] for line in out.splitlines(): parts = line.strip().split(" ", 2) if len(parts) < 3: continue try: mtime = float(parts[0]); size = int(parts[1]); name = parts[2] items.append({"name": name, "mtime": mtime, "size": size}) except Exception: continue return items def read_tail_over_ssh(ssh, remote_path, n_lines): """Lit les N dernières lignes via 'tail -n' (rapide sur gros logs).""" n = max(1, int(n_lines)) cmd = f"bash -lc \"tail -n {n} '{remote_path}'\"" _, stdout, stderr = ssh.exec_command(cmd) err = stderr.read().decode(errors="ignore") if "No such file" in err: raise FileNotFoundError(err) return stdout.read().decode(errors="ignore") def backup_and_truncate_remote(ssh, remote_path): """Crée une sauvegarde horodatée .bak et tronque le fichier à 0 octet. Retourne le chemin .bak.""" ts = datetime.now().strftime("%Y%m%d-%H%M%S") bak = f"{remote_path}.{ts}.bak" cmd = ( f"bash -lc \"cp '{remote_path}' '{bak}' 2>/dev/null || true; " f"truncate -s 0 '{remote_path}' 2>/dev/null || : > '{remote_path}'\"" ) _, out, err = ssh.exec_command(cmd) _ = out.read() e = err.read().decode(errors="ignore").strip() if "No such file" in e: raise FileNotFoundError(e) return bak # ========================= # CONNEXION & LISTE # ========================= if not all([VPS_HOST, VPS_USER, VPS_PASSWORD, VPS_LOG_DIR]): st.error("Complète les constantes en haut du fichier (hôte/utilisateur/mot de passe/dossier logs).") st.stop() try: ssh = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD) except Exception as e: st.error(f"❌ Connexion SSH échouée : {e}") st.stop() try: logs = list_logs_over_ssh(ssh, VPS_LOG_DIR) except Exception as e: ssh.close() st.error(f"❌ Impossible de lister les logs : {e}") st.stop() if not logs: ssh.close() st.warning("Aucun fichier *.log trouvé dans ce dossier sur le VPS.") st.stop() fichiers = [x["name"] for x in logs] choix = st.selectbox("📂 Sélectionnez un fichier log :", fichiers, index=0) log_info = next((x for x in logs if x["name"] == choix), logs[0]) log_path = f"{VPS_LOG_DIR.rstrip('/')}/{choix}" mtime_dt = datetime.fromtimestamp(log_info["mtime"]) st.caption(f"`{choix}` — Taille : {log_info['size']:,} o — Modifié : {mtime_dt:%Y-%m-%d %H:%M:%S}") # ========================= # OPTIONS & LECTURE # ========================= col1, col2, col3 = st.columns([1, 1, 1.2]) with col1: filtre_erreurs = st.checkbox( "🔍 Afficher uniquement les erreurs", value=False, help="Filtre sur ERROR, ❌, Traceback, failed, exception, critical, fatal" ) with col2: nb_lignes = st.slider("📏 Lignes à afficher", 10, 5000, 300) with col3: highlight = st.checkbox( "🖍️ Surligner erreurs/avertissements", value=True, help="Met en évidence ERROR/CRITICAL/EXCEPTION (rouge) et WARN (jaune)" ) # Lecture (on prend une marge quand filtre actif) try: marge = 300 if filtre_erreurs else 0 content = read_tail_over_ssh(ssh, log_path, nb_lignes + marge) lignes = content.splitlines(keepends=True) except Exception as e: ssh.close() st.error(f"Impossible de lire le fichier : {e}") st.stop() # On peut fermer maintenant (les actions rouvriront une session propre) ssh.close() # Filtrage if filtre_erreurs: err_keys = ["error", "traceback", "failed", "exception", "critical", "fatal"] lignes = [l for l in lignes if any(k in l.lower() for k in err_keys)] dernieres = lignes[-nb_lignes:] # Surlignage def colorize(lines): out = [] for l in lines: low = l.lower() style = ( "font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace;" "white-space: pre-wrap; margin: 0; padding: 2px 6px; border-radius: 4px;" ) bg = None if any(k in low for k in ["error", "traceback", "failed", "exception", "critical", "fatal", "❌"]): bg = "#ffe6e6" # rouge clair elif "warn" in low: bg = "#fff8e1" # jaune clair if bg: style += f"background:{bg};" out.append(f"