Files
Gestion_sondes/app/visualiseur_logs.py
2025-09-11 15:14:43 +02:00

274 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# visualiseur_logs.py
import os
from datetime import datetime
import streamlit as st
# Dépendances SSH & .env
try:
import paramiko
except ImportError:
paramiko = None
try:
from dotenv import load_dotenv
except ImportError:
load_dotenv = None
# =========================
# CONFIG & CHARGEMENT .ENV
# =========================
st.set_page_config(page_title="Visualiseur de Logs (VPS, password)", layout="wide")
st.title("🧾 Visualiseur de fichiers logs (VPS)")
# Chemins possibles du .env
CANDIDATE_ENV = [
"/home/debian/Gestion_sondes/.env",
os.path.join(os.path.expanduser("~"), "Gestion_sondes", ".env"),
".env",
]
ENV_LOADED_FROM = None
if load_dotenv:
for p in CANDIDATE_ENV:
if os.path.isfile(p):
load_dotenv(p)
ENV_LOADED_FROM = p
break
# Valeurs par défaut (surchageables par .env)
VPS_HOST = os.getenv("VPS_HOST", "app.domo91.fr")
VPS_PORT = int(os.getenv("VPS_PORT", "22") or "22")
VPS_USER = os.getenv("VPS_USER", "debian")
VPS_PASSWORD = os.getenv("VPS_PASSWORD", "")
VPS_LOG_DIR = os.getenv("VPS_LOG_DIR", "/home/debian/Gestion_sondes/Logs")
# =========================
# BARRE LATÉRALE
# =========================
with st.sidebar:
st.header("⚙️ Configuration")
if not load_dotenv:
st.caption("Astuce : installe python-dotenv pour charger automatiquement un .env")
st.code("pip install python-dotenv", language="bash")
st.caption(f".env chargé depuis : `{ENV_LOADED_FROM or 'non trouvé'}`")
VPS_HOST = st.text_input("Hôte VPS", value=VPS_HOST)
VPS_PORT = st.number_input("Port", value=VPS_PORT, min_value=1, max_value=65535, step=1)
VPS_USER = st.text_input("Utilisateur", value=VPS_USER)
VPS_LOG_DIR = st.text_input("Dossier des logs (VPS)", value=VPS_LOG_DIR)
VPS_PASSWORD = st.text_input("Mot de passe (VPS)", value=VPS_PASSWORD, type="password")
st.markdown("---")
st.caption("Connexion par **mot de passe** uniquement (pas de clé).")
# =========================
# FONCTIONS SSH (password)
# =========================
def ssh_connect_password(host, port, user, password):
"""Retourne un client SSH connecté (password)."""
if paramiko is None:
raise RuntimeError("Paramiko nest pas installé. Fais : pip install paramiko")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=host, port=int(port), username=user, password=password, timeout=10)
return ssh
def list_logs_over_ssh(ssh, log_dir):
"""
Retourne une liste triée (par mtime desc) de dicts :
[{'name': 'f.log', 'mtime': 1726140001.0, 'size': 12345}, ...]
Utilise 'find' + 'printf' pour robustesse et performance.
"""
# -maxdepth 1 : uniquement dans le dossier
# -type f -name "*.log" : fichiers .log
# -printf "%T@ %s %f\n" : mtime_epoch size filename
cmd = f"bash -lc \"find '{log_dir}' -maxdepth 1 -type f -name '*.log' -printf '%T@ %s %f\\n' 2>/dev/null | sort -nr\""
_, stdout, stderr = ssh.exec_command(cmd)
err = stderr.read().decode(errors="ignore").strip()
out = stdout.read().decode(errors="ignore")
# Pas bloquant si 'find' écrit sur stderr pour des warnings, on ignore
logs = []
for line in out.splitlines():
line = line.strip()
if not line:
continue
parts = line.split(" ", 2)
if len(parts) < 3:
continue
try:
mtime = float(parts[0])
size = int(parts[1])
name = parts[2]
logs.append({"name": name, "mtime": mtime, "size": size})
except Exception:
continue
return logs
def read_tail_over_ssh(ssh, remote_path, n_lines):
"""
Lit les N dernières lignes via 'tail -n' (rapide pour gros logs).
Retourne str (contenu brut).
"""
n = max(1, int(n_lines))
cmd = f"bash -lc \"tail -n {n} '{remote_path}'\""
_, stdout, stderr = ssh.exec_command(cmd)
err = stderr.read().decode(errors="ignore")
if err and "No such file" in err:
raise FileNotFoundError(err)
return stdout.read().decode(errors="ignore")
def backup_and_truncate_remote(ssh, remote_path):
"""
Crée une sauvegarde horodatée .bak et tronque le fichier à 0 octet.
Retourne le chemin de la sauvegarde.
"""
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
bak = f"{remote_path}.{ts}.bak"
# cp (backup) + truncate (ou redirection fallback)
cmd = (
f"bash -lc \"cp '{remote_path}' '{bak}' 2>/dev/null || true; "
f"truncate -s 0 '{remote_path}' 2>/dev/null || : > '{remote_path}'\""
)
_, stdout, stderr = ssh.exec_command(cmd)
_ = stdout.read()
err = stderr.read().decode(errors="ignore").strip()
# On tolère la plupart des messages ; on remonte seulement si 'No such file'
if "No such file" in err:
raise FileNotFoundError(err)
return bak
# =========================
# CONTRÔLE DES CHAMPS AVANT CONNEXION
# =========================
if not VPS_HOST or not VPS_USER or not VPS_LOG_DIR:
st.error("Renseigne au minimum Hôte, Utilisateur et Dossier des logs.")
st.stop()
if not VPS_PASSWORD:
st.error("Renseigne le mot de passe pour lauthentification.")
st.stop()
# =========================
# CONNEXION & LISTE DES LOGS
# =========================
try:
ssh = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD)
except Exception as e:
st.error(f"❌ Connexion SSH échouée : {e}")
st.stop()
try:
logs = list_logs_over_ssh(ssh, VPS_LOG_DIR)
except Exception as e:
ssh.close()
st.error(f"❌ Impossible de lister les logs : {e}")
st.stop()
if not logs:
ssh.close()
st.warning("Aucun fichier *.log trouvé dans ce dossier sur le VPS.")
st.stop()
# Combobox des fichiers, triés par mtime desc
fichiers = [item["name"] for item in logs]
choix = st.selectbox("📂 Sélectionnez un fichier log (VPS) :", fichiers, index=0)
log_info = next((x for x in logs if x["name"] == choix), logs[0])
log_path = f"{VPS_LOG_DIR.rstrip('/')}/{choix}"
# Infos de bandeau
mtime_dt = datetime.fromtimestamp(log_info["mtime"])
st.caption(
f"VPS {VPS_USER}@{VPS_HOST} • `{choix}` — "
f"Taille : {log_info['size']:,} octets — Modifié le : {mtime_dt:%Y-%m-%d %H:%M:%S}"
)
# =========================
# OPTIONS D'AFFICHAGE
# =========================
col1, col2 = st.columns([1, 1])
with col1:
filtre_erreurs = st.checkbox(
"🔍 Afficher uniquement les erreurs",
value=False,
help="Filtre sur ERROR, ❌, Traceback, failed, exception"
)
with col2:
nb_lignes = st.slider("📏 Nombre de lignes à afficher", 10, 5000, 300)
# =========================
# LECTURE DU CONTENU (tail)
# =========================
try:
# Lis légèrement plus que demandé quand on filtre, pour avoir assez de matière
marge = 300 if filtre_erreurs else 0
content = read_tail_over_ssh(ssh, log_path, nb_lignes + marge)
lignes = content.splitlines(keepends=True)
except Exception as e:
ssh.close()
st.error(f"Impossible de lire le fichier : {e}")
st.stop()
# On peut fermer ici (les actions rouvriront une connexion fraîche)
ssh.close()
# Filtre "erreurs"
if filtre_erreurs:
mots_cles = ["error", "", "traceback", "failed", "exception"]
lignes = [l for l in lignes if any(m in l.lower() for m in mots_cles)]
# Affichage
dernieres = lignes[-nb_lignes:]
st.text_area("📄 Contenu du fichier log :", "".join(dernieres), height=600)
st.divider()
# =========================
# ACTIONS : BACKUP + VIDAGE
# =========================
st.subheader("🧰 Actions sur ce fichier (VPS)")
colA, colB, colC = st.columns([1, 1, 2])
with colA:
faire_backup = st.checkbox(
"Créer une copie .bak avant vidage",
value=True,
help="Copie horodatée placée à côté du fichier."
)
with colB:
confirmation = st.checkbox("Je confirme vouloir vider", value=False)
with colC:
vider = st.button("🧹 Vider ce log", use_container_width=True)
if vider:
if not confirmation:
st.warning("❗ Coche dabord « Je confirme vouloir vider » pour éviter les erreurs.")
else:
try:
ssh2 = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD)
with ssh2:
if faire_backup:
bak_path = backup_and_truncate_remote(ssh2, log_path)
st.info(f"📦 Copie de sauvegarde créée : `{bak_path}`")
else:
# Troncature seule
cmd = f"bash -lc \"truncate -s 0 '{log_path}' 2>/dev/null || : > '{log_path}'\""
_, out, err = ssh2.exec_command(cmd)
_ = out.read(); _ = err.read()
st.success("✅ Fichier vidé avec succès.")
st.rerun()
except PermissionError:
st.error("⛔ Permission refusée. Vérifie les droits sur le fichier/dossier.")
except FileNotFoundError as e:
st.error(f"📁 Fichier ou dossier introuvable : {e}")
except Exception as e:
st.error(f"❌ Échec du vidage : {e}")