Files
Outils/app/Logs.py
2025-11-12 13:25:26 +01:00

226 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Logs.py
# Visualiseur de logs SSH ultra-simplifié (Streamlit)
# - Lecture de la config via .env uniquement
# - Connexion SSH via clé (chemin) ou mot de passe
# - Liste des fichiers, tail dernier N, recherche, téléchargement
import os
import time
import stat
import paramiko
import streamlit as st
from dotenv import load_dotenv
from typing import Optional, Tuple
# ================
# CONFIG .env
# ================
load_dotenv() # cherche automatiquement un .env dans le dossier courant
VPS_HOST = os.getenv("SSH_HOST", "").strip()
VPS_PORT = int(os.getenv("SSH_PORT", 22))
VPS_USER = os.getenv("SSH_USER", "").strip()
VPS_PASSWORD = os.getenv("SSH_PASSWORD", "")
VPS_KEY_PATH = os.getenv("SSH_KEY_PATH", "").strip()
VPS_LOG_DIR = os.getenv("SSH_LOG_DIR", "/home/debian/Gestion_sondes/Logs").rstrip("/")
# ================
# UTILITAIRES
# ================
# --- Connexion strictement au MOT DE PASSE ---
@st.cache_resource(show_spinner=False)
def get_ssh_client_password_only() -> paramiko.SSHClient:
host = os.getenv("SSH_HOST", "").strip()
user = os.getenv("SSH_USER", "").strip()
port = int(os.getenv("SSH_PORT", 22))
pwd = os.getenv("SSH_PASSWORD", "")
if not host or not user or not pwd:
raise RuntimeError("SSH_HOST / SSH_USER / SSH_PASSWORD manquant(s) dans .env")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# IMPORTANT : on force lusage du mot de passe, aucune clé nest chargée
try:
client.connect(
hostname=host, port=port, username=user,
password=pwd, timeout=10, allow_agent=False, look_for_keys=False
)
# vérifie que le transport est bien up
t = client.get_transport()
if not t or not t.is_active():
raise RuntimeError("Transport SSH inactif après connexion.")
return client
except Exception as e:
raise RuntimeError(f"Échec connexion SSH par mot de passe : {e}") from e
# --- Ouverture SFTP sûre ---
try:
client = get_ssh_client_password_only()
except Exception as e:
st.error(str(e))
st.stop()
try:
sftp = client.open_sftp()
except Exception as e:
st.error(f"Erreur ouverture SFTP : {e}")
try: client.close()
except: pass
st.stop()
def list_remote_files(sftp: paramiko.SFTPClient, directory: str) -> list[Tuple[str, int, float]]:
"""
Retourne [(nom, taille_bytes, mtime_epoch), ...] triés par mtime desc.
"""
items = []
try:
for entry in sftp.listdir_attr(directory):
mode = entry.st_mode
if stat.S_ISREG(mode):
items.append((entry.filename, entry.st_size, entry.st_mtime))
except FileNotFoundError:
st.error(f"❌ Dossier introuvable côté serveur : {directory}")
return []
except Exception as e:
st.error(f"❌ Impossible de lister {directory} : {e}")
return []
items.sort(key=lambda t: t[2], reverse=True)
return items
def read_remote_text_tail(
client: paramiko.SSHClient, path: str, lines: int = 500, grep: str | None = None
) -> str:
"""
Lit côté serveur les N dernières lignes (tail -n),
compatible .gz via zcat si nécessaire, avec grep optionnel (insensible à la casse).
"""
safe_path = path.replace('"', '\\"')
if path.endswith(".gz"):
base_cmd = f'zcat "{safe_path}"'
else:
base_cmd = f'tail -n {max(1, lines)} "{safe_path}" && exit 0 || head -n {max(1, lines)} "{safe_path}"'
if grep:
# -i insensible à la casse ; on passe par grep après la commande de base
grep_safe = grep.replace('"', '\\"') # échappe juste les guillemets
base_cmd = f'{base_cmd} | grep -i "{grep_safe}" || true'
stdin, stdout, stderr = client.exec_command(base_cmd, timeout=20)
out = stdout.read()
err = stderr.read()
text = (out or b"").decode("utf-8", errors="replace")
# Si zcat sans grep et sans tail (gros fichiers), on limite côté client pour éviter d'assommer Streamlit
if path.endswith(".gz") and not grep:
lines_list = text.splitlines()
text = "\n".join(lines_list[-max(1, lines):])
if err and not text:
# Afficher l'erreur seulement si rien en sortie
text = (b"[stderr] " + err).decode("utf-8", errors="replace")
return text
def download_remote_file(sftp: paramiko.SFTPClient, path: str) -> bytes:
"""Télécharge le fichier distant (binaire) en mémoire."""
with sftp.file(path, "rb") as f:
return f.read()
# ================
# UI STREAMLIT
# ================
st.set_page_config(page_title="Visualiseur de Logs", layout="wide")
st.title("📜 Visualiseur de logs (SSH)")
with st.expander("Configuration (lecture seule)", expanded=False):
st.code(
f"HOST={VPS_HOST}\nUSER={VPS_USER}\nPORT={VPS_PORT}\n"
f"KEY_PATH={VPS_KEY_PATH or '-'}\nLOG_DIR={VPS_LOG_DIR}",
language="bash"
)
client = _get_ssh_client()
if client is None:
st.stop()
try:
sftp = client.open_sftp()
except Exception as e:
st.error(f"Erreur ouverture SFTP : {e}")
try:
client.close()
except Exception:
pass
st.stop()
with st.sidebar:
st.header("🔎 Options")
refresh = st.button("🔄 Rafraîchir la liste")
default_tail = st.number_input("Dernières lignes (tail)", min_value=50, max_value=100_000, value=500, step=50)
search_term = st.text_input("Recherche (grep -i)", value="", placeholder="ex: ERROR | Timeout | sonde")
st.caption("Astuce : la recherche applique un grep insensible à la casse sur le résultat.")
# Rafraîchissement simple : on rejoue listdir si bouton cliqué
if refresh:
time.sleep(0.2)
files = list_remote_files(sftp, VPS_LOG_DIR)
if not files:
sftp.close()
client.close()
st.stop()
# Tableau simple (nom, taille, date)
col1, col2 = st.columns([2, 1])
with col1:
names = [f[0] for f in files]
choice = st.selectbox("Fichiers disponibles (triés par date desc.)", names, index=0)
with col2:
# Infos du fichier choisi
chosen = next((f for f in files if f[0] == choice), None)
if chosen:
size_kb = chosen[1] / 1024
mtime_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(chosen[2]))
st.metric("Taille (KB)", f"{size_kb:,.0f}".replace(",", " "))
st.metric("Modifié le", mtime_str)
full_path = f"{VPS_LOG_DIR}/{choice}"
# Actions
place1, place2, place3 = st.columns([1, 1, 6])
with place1:
do_show = st.button("👁️ Afficher")
with place2:
do_dl = st.button("⬇️ Télécharger")
# Affichage
if do_show:
with st.spinner("Lecture du fichier distant..."):
text = read_remote_text_tail(
client,
full_path,
lines=int(default_tail),
grep=(search_term or None)
)
st.text_area(f"Contenu : {choice}", text, height=600)
# Téléchargement
if do_dl:
try:
data = download_remote_file(sftp, full_path)
st.download_button(
label=f"Télécharger {choice}",
data=data,
file_name=choice,
mime="application/octet-stream"
)
except Exception as e:
st.error(f"❌ Échec du téléchargement : {e}")
# Fermeture propre
sftp.close()
client.close()