Files
Gestion_sondes/Outils/visualiseur_logs.py

299 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# visualiseur_logs.py
# Dépendances: streamlit, paramiko
# pip install streamlit paramiko
import html
import time
from datetime import datetime
import streamlit as st
try:
import paramiko
except ImportError:
paramiko = None
# =========================
# CONFIG — À RENSEIGNER
# =========================
VPS_HOST = "162.19.78.131" # ← mets ton IP/DNS
VPS_PORT = 22 # ← port SSH
VPS_USER = "debian" # ← utilisateur
VPS_PASSWORD = "lpZwixbBUFtGY" # ← mot de passe
VPS_LOG_DIR = "/home/debian/Gestion_sondes/Logs" # ← dossier des logs sur le VPS
# =========================
# UI
# =========================
st.set_page_config(page_title="Visualiseur de Logs (VPS, password)", layout="wide")
st.title("🧾 Visualiseur de fichiers logs (VPS)")
st.caption(f"Cible : {VPS_USER}@{VPS_HOST}:{VPS_PORT} • Dossier logs : {VPS_LOG_DIR}")
if paramiko is None:
st.error("Paramiko nest pas installé. Exécute : pip install paramiko")
st.stop()
# Barre latérale : options daffichage & refresh
with st.sidebar:
st.header("⚙️ Options")
auto_refresh = st.toggle("🔄 Rafraîchissement auto", value=False, key="auto_refresh")
refresh_interval = st.slider("Intervalle (secondes)", 2, 60, 5, key="refresh_interval")
if st.button("Rafraîchir maintenant"):
st.rerun()
# =========================
# FONCTIONS SSH
# =========================
def ssh_connect_password(host, port, user, password):
"""Retourne un client SSH connecté (password)."""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=host, port=int(port), username=user, password=password, timeout=10)
return ssh
def list_logs_over_ssh(ssh, log_dir):
"""
Liste les *.log du dossier (sans sous-dossiers), triés par mtime desc.
Retourne [{'name': 'f.log', 'mtime': 1234567890.0, 'size': 1024}, ...]
"""
cmd = (
f"bash -lc \"find '{log_dir}' -maxdepth 1 -type f -name '*.log' "
f"-printf '%T@ %s %f\\n' 2>/dev/null | sort -nr\""
)
_, stdout, stderr = ssh.exec_command(cmd)
_ = stderr.read().decode(errors="ignore") # on ignore les warnings find
out = stdout.read().decode(errors="ignore")
items = []
for line in out.splitlines():
parts = line.strip().split(" ", 2)
if len(parts) < 3:
continue
try:
mtime = float(parts[0]); size = int(parts[1]); name = parts[2]
items.append({"name": name, "mtime": mtime, "size": size})
except Exception:
continue
return items
def read_tail_over_ssh(ssh, remote_path, n_lines):
"""Lit les N dernières lignes via 'tail -n' (rapide sur gros logs)."""
n = max(1, int(n_lines))
cmd = f"bash -lc \"tail -n {n} '{remote_path}'\""
_, stdout, stderr = ssh.exec_command(cmd)
err = stderr.read().decode(errors="ignore")
if "No such file" in err:
raise FileNotFoundError(err)
return stdout.read().decode(errors="ignore")
def backup_and_truncate_remote(ssh, remote_path):
"""Crée une sauvegarde horodatée .bak et tronque le fichier à 0 octet. Retourne le chemin .bak."""
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
bak = f"{remote_path}.{ts}.bak"
cmd = (
f"bash -lc \"cp '{remote_path}' '{bak}' 2>/dev/null || true; "
f"truncate -s 0 '{remote_path}' 2>/dev/null || : > '{remote_path}'\""
)
_, out, err = ssh.exec_command(cmd)
_ = out.read()
e = err.read().decode(errors="ignore").strip()
if "No such file" in e:
raise FileNotFoundError(e)
return bak
# =========================
# CONNEXION & LISTE
# =========================
if not all([VPS_HOST, VPS_USER, VPS_PASSWORD, VPS_LOG_DIR]):
st.error("Complète les constantes en haut du fichier (hôte/utilisateur/mot de passe/dossier logs).")
st.stop()
try:
ssh = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD)
except Exception as e:
st.error(f"❌ Connexion SSH échouée : {e}")
st.stop()
try:
logs = list_logs_over_ssh(ssh, VPS_LOG_DIR)
except Exception as e:
ssh.close()
st.error(f"❌ Impossible de lister les logs : {e}")
st.stop()
if not logs:
ssh.close()
st.warning("Aucun fichier *.log trouvé dans ce dossier sur le VPS.")
st.stop()
fichiers = [x["name"] for x in logs]
choix = st.selectbox("📂 Sélectionnez un fichier log :", fichiers, index=0)
log_info = next((x for x in logs if x["name"] == choix), logs[0])
log_path = f"{VPS_LOG_DIR.rstrip('/')}/{choix}"
mtime_dt = datetime.fromtimestamp(log_info["mtime"])
st.caption(f"`{choix}` — Taille : {log_info['size']:,} o — Modifié : {mtime_dt:%Y-%m-%d %H:%M:%S}")
# =========================
# OPTIONS & LECTURE
# =========================
col1, col2, col3 = st.columns([1, 1, 1.2])
with col1:
filtre_erreurs = st.checkbox(
"🔍 Afficher uniquement les erreurs",
value=False,
help="Filtre sur ERROR, ❌, Traceback, failed, exception, critical, fatal"
)
with col2:
nb_lignes = st.slider("📏 Lignes à afficher", 10, 5000, 300)
with col3:
highlight = st.checkbox(
"🖍️ Surligner erreurs/avertissements",
value=True,
help="Met en évidence ERROR/CRITICAL/EXCEPTION (rouge) et WARN (jaune)"
)
# Lecture (on prend une marge quand filtre actif)
try:
marge = 300 if filtre_erreurs else 0
content = read_tail_over_ssh(ssh, log_path, nb_lignes + marge)
lignes = content.splitlines(keepends=True)
except Exception as e:
ssh.close()
st.error(f"Impossible de lire le fichier : {e}")
st.stop()
# On peut fermer maintenant (les actions rouvriront une session propre)
ssh.close()
# Filtrage
if filtre_erreurs:
err_keys = ["error", "traceback", "failed", "exception", "critical", "fatal"]
lignes = [l for l in lignes if any(k in l.lower() for k in err_keys)]
dernieres = lignes[-nb_lignes:]
# Surlignage
def colorize(lines):
out = []
for l in lines:
low = l.lower()
style = (
"font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace;"
"white-space: pre-wrap; margin: 0; padding: 2px 6px; border-radius: 4px;"
)
bg = None
if any(k in low for k in ["error", "traceback", "failed", "exception", "critical", "fatal", ""]):
bg = "#ffe6e6" # rouge clair
elif "warn" in low:
bg = "#fff8e1" # jaune clair
if bg:
style += f"background:{bg};"
out.append(f"<div style='{style}'>{html.escape(l)}</div>")
return "\n".join(out)
if highlight:
st.markdown(colorize(dernieres), unsafe_allow_html=True)
else:
st.text_area("📄 Contenu du fichier log :", "".join(dernieres), height=600)
st.divider()
# =========================
# ACTIONS (Backup + Vidage)
# =========================
st.subheader("🧰 Actions sur ce fichier (VPS)")
colA, colB, colC = st.columns([1, 1, 2])
with colA:
faire_backup = st.checkbox("Créer une copie .bak avant vidage", value=True,
help="Copie horodatée à côté du fichier.")
with colB:
confirmation = st.checkbox("Je confirme vouloir vider", value=False)
with colC:
vider = st.button("🧹 Vider ce log", use_container_width=True)
if vider:
if not confirmation:
st.warning("❗ Coche dabord « Je confirme vouloir vider » pour éviter les erreurs.")
else:
try:
ssh2 = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD)
with ssh2:
if faire_backup:
bak_path = backup_and_truncate_remote(ssh2, log_path)
st.info(f"📦 Copie de sauvegarde créée : `{bak_path}`")
else:
cmd = f"bash -lc \"truncate -s 0 '{log_path}' 2>/dev/null || : > '{log_path}'\""
_, out, err = ssh2.exec_command(cmd)
_ = out.read(); _ = err.read()
st.success("✅ Fichier vidé avec succès.")
st.rerun()
except PermissionError:
st.error("⛔ Permission refusée. Vérifie les droits sur le fichier/dossier.")
except FileNotFoundError as e:
st.error(f"📁 Fichier/dossier introuvable : {e}")
except Exception as e:
st.error(f"❌ Échec du vidage : {e}")
# =========================
# PURGE DE PLUSIEURS LOGS
# =========================
st.subheader("🗑️ Purge de plusieurs logs (VPS)")
# On reliste tous les logs disponibles
try:
ssh3 = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD)
all_logs = list_logs_over_ssh(ssh3, VPS_LOG_DIR)
ssh3.close()
except Exception as e:
all_logs = []
st.error(f"Impossible de relister les logs : {e}")
if all_logs:
age_jours = st.number_input("Supprimer les fichiers plus vieux que (jours)", 1, 365, 7)
now = time.time()
candidats = [x for x in all_logs if (now - x["mtime"]) / 86400 >= age_jours]
if not candidats:
st.info("Aucun fichier ne correspond au filtre d'âge.")
else:
st.write(f"📂 {len(candidats)} fichier(s) plus vieux que {age_jours} jours trouvé(s).")
selection = []
for x in candidats:
label = f'{x["name"]}{x["size"]/1024:.1f} Ko — {datetime.fromtimestamp(x["mtime"]).strftime("%Y-%m-%d %H:%M")}'
if st.checkbox(label, key=f"purge_{x['name']}"):
selection.append(x["name"])
if selection:
st.warning(f"{len(selection)} fichier(s) sélectionné(s) pour suppression définitive.")
confirm = st.checkbox("Je confirme la suppression définitive", key="confirm_purge")
if st.button("❌ Supprimer les fichiers sélectionnés", type="primary", disabled=not confirm):
try:
ssh4 = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD)
for fname in selection:
remote_path = f"{VPS_LOG_DIR.rstrip('/')}/{fname}"
cmd = f"bash -lc \"rm -f '{remote_path}'\""
_, out, err = ssh4.exec_command(cmd)
_ = out.read(); _ = err.read()
ssh4.close()
st.success(f"{len(selection)} fichier(s) supprimé(s).")
st.rerun()
except Exception as e:
st.error(f"Erreur lors de la suppression : {e}")
else:
st.info("Aucun log trouvé pour la purge.")
# =========================
# AUTO-REFRESH (fin de script)
# =========================
if auto_refresh:
# Affiche une petite info et relance la page après X secondes
st.caption(f"🔄 Rafraîchissement auto activé — Prochaine mise à jour dans {refresh_interval} s…")
time.sleep(refresh_interval)
st.rerun()