Remise en état des fichiers Gestion_sondes

This commit is contained in:
2025-09-20 15:59:21 +02:00
parent 511e377dc8
commit 2fa848b4a7
11 changed files with 236 additions and 671 deletions

236
Outils/Injection_tests.py Normal file
View File

@@ -0,0 +1,236 @@
import os
import datetime as dt
import pandas as pd
import streamlit as st
from dotenv import load_dotenv
import mysql.connector as mc
# ----------------------
# Config de la page
# ----------------------
st.set_page_config(page_title="Injection de données de test", page_icon="🧪", layout="centered")
st.title("🧪 Injecteur de relevés de test (Sondes)")
st.caption("Crée ~10 lignes au-dessus d'un seuil pour tester les alertes")
# ----------------------
# Connexion MySQL depuis .env
# ----------------------
@st.cache_resource(show_spinner=False)
def get_connection():
load_dotenv()
try:
cnx = mc.connect(
host=os.getenv("DB_HOST"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASS"),
database=os.getenv("DB_NAME"),
autocommit=True,
)
return cnx
except Exception as e:
st.error(f"Échec de connexion MySQL : {e}")
raise
# ----------------------
# Helpers : liste des sondes actives et hors entretien
# ----------------------
@st.cache_data(ttl=60, show_spinner=False)
def list_sondes(site: str) -> list:
"""Retourne la liste des sondes actives (Etat=ON) et non en entretien pour le site.
Essaie d'abord monitor_{site}, puis Chambres_froides, sinon fallback via la table de mesures.
"""
cnx = get_connection()
cur = cnx.cursor()
# 1) monitor_{site}
try:
cur.execute(
f"""
SELECT Sonde
FROM `monitor_{site}`
WHERE (Etat='ON' OR Etat=1)
AND ( (Maintenance='OFF') OR (Maintenance=0) OR (Maintenance IS NULL) )
ORDER BY Sonde
"""
)
rows = [r[0] for r in cur.fetchall()]
if rows:
return rows
except Exception:
pass
# 2) Chambres_froides
try:
cur.execute(
"""
SELECT Sonde
FROM `Chambres_froides`
WHERE Lieu=%s AND (Etat='ON' OR Etat=1)
AND ( (Maintenance='OFF') OR (Maintenance=0) OR (Maintenance IS NULL) )
ORDER BY Sonde
""",
(site,)
)
rows = [r[0] for r in cur.fetchall()]
if rows:
return rows
except Exception:
pass
# 3) Fallback : dernier état via table de mesures (distinct)
try:
cur.execute(
f"SELECT DISTINCT Sonde FROM `{site}` ORDER BY Sonde LIMIT 200"
)
return [r[0] for r in cur.fetchall()]
except Exception:
return []
# ----------------------
# Helper : récupérer Temp_Max pour une sonde donnée
# ----------------------
@st.cache_data(ttl=60, show_spinner=False)
def get_temp_max(site: str, sonde: str):
"""Retourne Temp_Max pour (site, sonde) en cherchant d'abord dans monitor_{site}, puis Chambres_froides.
Renvoie None si non trouvé."""
try:
cnx = get_connection()
cur = cnx.cursor()
# 1) monitor_{site}
try:
cur.execute(
f"SELECT Temp_Max FROM `monitor_{site}` WHERE Sonde=%s LIMIT 1",
(sonde,)
)
row = cur.fetchone()
if row and row[0] is not None:
return float(row[0])
except Exception:
pass
# 2) Chambres_froides
try:
cur.execute(
"""
SELECT Temp_Max
FROM `Chambres_froides`
WHERE Lieu=%s AND Sonde=%s
LIMIT 1
""",
(site, sonde)
)
row = cur.fetchone()
if row and row[0] is not None:
return float(row[0])
except Exception:
pass
except Exception:
return None
return None
# ----------------------
# UI paramètres
# ----------------------
with st.sidebar:
st.header("Paramètres")
site = st.selectbox("Site (table)", ["Saclay", "Meudon"], index=0)
# Sélecteur de sonde depuis la liste active / hors entretien
options_sondes = list_sondes(site)
if not options_sondes:
st.warning("Aucune sonde active trouvée (ou table monitor introuvable). Vous pouvez saisir un nom manuel.")
sonde = st.text_input("Nom de la sonde", value="TEST_Chambre1")
else:
sonde = st.selectbox("Sonde (actives, hors entretien)", options_sondes)
st.subheader("Température")
# Auto-remplissage du seuil depuis la base et verrouillage par défaut
_temp_db = get_temp_max(site, sonde)
if _temp_db is None:
st.warning("Temp_Max introuvable en base ; valeur par défaut 6.0°C.")
_temp_db = 6.0
allow_edit = st.checkbox("Autoriser la modification du seuil", value=False)
temp_max = st.number_input("Seuil (Temp_Max)", value=float(_temp_db), step=0.1, disabled=not allow_edit)
delta = st.number_input("Delta au-dessus du seuil", value=1.0, step=0.1)
absolute_override = st.checkbox("Définir une température absolue à la place")
absolute_temp = st.number_input(
"Température absolue (si coché)", value=12.5, step=0.1, disabled=not absolute_override
)
st.subheader("Série temporelle")
rows = st.number_input("Nombre de points", min_value=1, max_value=200, value=10, step=1)
step_min = st.number_input("Pas (minutes)", min_value=1, max_value=120, value=5, step=1)
start_offset = st.number_input("Début : il y a (minutes)", min_value=0, max_value=1440, value=45, step=5)
st.markdown("---")
st.caption("Nettoyage rapide")
cleanup_scope = st.selectbox("Supprimer", ["Cette sonde", "Toutes les TEST_ des dernières 24h"])
do_cleanup = st.button("🧹 Supprimer les données de test")
col1, col2 = st.columns(2)
# ----------------------
# Actions
# ----------------------
if col1.button("🚀 Injecter les données"):
try:
cnx = get_connection()
cur = cnx.cursor()
# Calcul des timestamps et de la valeur
now = dt.datetime.now()
t0 = now - dt.timedelta(minutes=int(start_offset))
if absolute_override:
value = float(absolute_temp)
else:
value = float(temp_max) + float(delta)
# Préparation batch INSERT
sql = f"INSERT INTO `{site}` (Sonde, Temperature, Date) VALUES (%s,%s,%s)"
batch = []
for i in range(int(rows)):
ts = t0 + dt.timedelta(minutes=i * int(step_min))
batch.append((sonde, value, ts.strftime("%Y-%m-%d %H:%M:%S")))
cur.executemany(sql, batch)
st.success(f"{len(batch)} lignes insérées dans `{site}` pour **{sonde}** à **{value}°C**.")
# Aperçu des données insérées
cur.execute(
f"""
SELECT Id, Sonde, Temperature, Date
FROM `{site}`
WHERE Sonde = %s AND Date >= %s AND Date <= %s
ORDER BY Date DESC
LIMIT 50
""",
(
sonde,
(t0 - dt.timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S"),
(t0 + dt.timedelta(minutes=int(rows)*int(step_min) + 1)).strftime("%Y-%m-%d %H:%M:%S"),
),
)
rows_preview = cur.fetchall()
if rows_preview:
df = pd.DataFrame(rows_preview, columns=["Id", "Sonde", "Temperature", "Date"])
st.dataframe(df, use_container_width=True, hide_index=True)
else:
st.info("Aucune ligne trouvée pour l'aperçu (vérifiez les filtres/horaires).")
except Exception as e:
st.error(f"Erreur lors de l'injection : {e}")
# Nettoyage
def cleanup():
cnx = get_connection()
cur = cnx.cursor()
if cleanup_scope == "Cette sonde":
cur.execute(f"DELETE FROM `{site}` WHERE Sonde = %s", (sonde,))
st.success(f"Données supprimées pour la sonde **{sonde}** dans `{site}`.")
else:
cur.execute(
f"DELETE FROM `{site}` WHERE Sonde LIKE 'TEST\_%' ESCAPE '\\' AND Date >= NOW() - INTERVAL 1 DAY"
)
st.success(f"Toutes les sondes **TEST_** des dernières 24h supprimées dans `{site}`.")
if do_cleanup:
try:
cleanup()
except Exception as e:
st.error(f"Erreur de nettoyage : {e}")

342
Outils/tracker.py Normal file
View File

@@ -0,0 +1,342 @@
# tracker.py
# -------------------------------------------------------------
# Streamlit — Gestion de la table MySQL Sondes.tracker (avec address_hyphen)
# -------------------------------------------------------------
# Schéma attendu :
# id (PK), address (ROM {0x..}), address_hyphen (28-..-..-..-..-..-..-..),
# lieu, repere, mise_en_service (DATE), res_bits, date (timestamp)
# Authentification intégrée via .env (AUTH_USERS JSON)
# -------------------------------------------------------------
import os
import re
import time
import json
import hmac
from typing import Optional
from datetime import date
import pandas as pd
import streamlit as st
import mysql.connector as mysql
from contextlib import contextmanager
from dotenv import load_dotenv
# ==========================
# Configuration / Constantes
# ==========================
load_dotenv() # lit .env si présent
TABLE_NAME = "tracker"
COL_ID = "id"
COL_ADDRESS = "address" # format ROM : {0x28,0xFF,...}
COL_ADDR_HYPHEN = "address_hyphen" # format hyphen : 28-xx-xx-xx-xx-xx-xx-xx
COL_LIEU = "lieu"
COL_REPERE = "repere"
COL_MES = "mise_en_service" # DATE
COL_RESBITS = "res_bits"
COL_DATE = "date"
# Configuration BDD (standardisée sur les variables d'env MYSQL_*)
DB_CFG = dict(
host=os.getenv("DB_HOST"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASS"),
database=os.getenv("DB_NAME"),
port=int(os.getenv("MYSQL_PORT", "3306")),
)
# Regex d'une ROM code DS18B20 au format {0x28,0xFF,0xAA,0xBB,0xCC,0xDD,0xEE,0xCRC}
ROM_REGEX = re.compile(r"^{(?:0x[0-9A-Fa-f]{2},){7}0x[0-9A-Fa-f]{2}}$")
# Adresse hyphen : 8 octets hexa séparés par des tirets
HYPHEN_REGEX = re.compile(r"^[0-9A-Fa-f]{2}(?:-[0-9A-Fa-f]{2}){7}$")
# Mapping résolution DS18B20 (bits -> infos)
RES_MAP = {
9: {"precision": 0.5, "tconv_ms": 94},
10: {"precision": 0.25, "tconv_ms": 188},
11: {"precision": 0.125, "tconv_ms": 375},
12: {"precision": 0.0625,"tconv_ms": 750},
}
# ==================
# Authentification via .env (AUTH_USERS)
# ==================
AUTH_USERS_RAW = os.getenv("AUTH_USERS", "[]")
def _load_users() -> dict:
try:
data = json.loads(AUTH_USERS_RAW)
return {str(d.get("user", "")).strip(): str(d.get("pass", "")) for d in data if d.get("user") and d.get("pass")}
except Exception:
return {}
USERS = _load_users()
def _constant_time_equals(a: str, b: str) -> bool:
return hmac.compare_digest(a.encode("utf-8"), b.encode("utf-8"))
def verify_credentials(username: str, password: str) -> bool:
if not username or not password:
return False
expected = USERS.get(username.strip())
if expected is None:
return False
return _constant_time_equals(password, expected)
def require_login() -> Optional[str]:
if st.session_state.get("auth_ok") and st.session_state.get("auth_user"):
return st.session_state.get("auth_user")
st.markdown("<h2 style='text-align:center;'>🔒 Tracker</h2>", unsafe_allow_html=True)
_, col2, _ = st.columns([1, 2, 1])
with col2:
with st.form("login_form", clear_on_submit=False):
username = st.text_input("Utilisateur")
password = st.text_input("Mot de passe", type="password")
ok = st.form_submit_button("Se connecter")
if ok:
if verify_credentials(username, password):
st.session_state["auth_ok"] = True
st.session_state["auth_user"] = username.strip()
st.success("Connexion réussie.")
time.sleep(0.3)
st.rerun()
else:
st.error("Identifiants invalides.")
st.stop()
# ==================
# Accès Base de Données
# ==================
@contextmanager
def get_conn():
conn = mysql.connect(**DB_CFG)
try:
yield conn
finally:
conn.close()
# -----------------
# Utilitaires
# -----------------
def rom_help() -> str:
return (
"Format ROM attendu : `{0x28,0xFF,0xAA,0xBB,0xCC,0xDD,0xEE,0x12}` "
"(8 octets en hex). Le premier octet est souvent 0x28 pour DS18B20."
)
def is_valid_rom(address: str) -> bool:
return bool(ROM_REGEX.match(str(address).strip()))
def is_valid_hyphen(address_h: str) -> bool:
return bool(HYPHEN_REGEX.match(str(address_h).strip()))
def rom_to_hyphen(rom: str) -> str:
hexes = re.findall(r"0x([0-9A-Fa-f]{2})", str(rom))
if len(hexes) != 8:
return ""
return "-".join(h.lower() for h in hexes)
def hyphen_to_rom(h: str) -> str:
parts = str(h).strip().split("-")
if len(parts) != 8 or not all(re.fullmatch(r"[0-9A-Fa-f]{2}", p) for p in parts):
return ""
return "{" + ",".join(f"0x{p.upper()}" for p in parts) + "}"
def res_label(bits: int) -> str:
info = RES_MAP.get(bits)
if not info:
return f"{bits} bits (inconnu)"
return f"{bits} bits (±{info['precision']}°C, {info['tconv_ms']} ms)"
# -----------------
# Fonctions SQL
# -----------------
def fetch_trackers(where_lieu: str | None = None) -> pd.DataFrame:
query = (
f"SELECT {COL_ID}, {COL_ADDRESS}, {COL_ADDR_HYPHEN}, {COL_LIEU}, "
f"{COL_REPERE}, {COL_MES}, {COL_RESBITS}, {COL_DATE} "
f"FROM {TABLE_NAME}"
)
params = []
if where_lieu:
query += f" WHERE {COL_LIEU} = %s"
params.append(where_lieu)
query += f" ORDER BY {COL_LIEU}, {COL_REPERE}, {COL_ADDR_HYPHEN}, {COL_ADDRESS}"
with get_conn() as conn:
df = pd.read_sql(query, conn, params=params)
return df
def insert_tracker(address: str, lieu: str, res_bits: int,
repere: str | None = None, mise_en_service: date | None = None,
address_hyphen: str | None = None) -> int:
addr_rom = (address or "").strip() if address else ""
addr_hyp = (address_hyphen or "").strip() if address_hyphen else ""
if addr_rom and not addr_hyp:
addr_hyp = rom_to_hyphen(addr_rom)
if addr_hyp and not addr_rom:
addr_rom = hyphen_to_rom(addr_hyp)
if not is_valid_rom(addr_rom) or not is_valid_hyphen(addr_hyp):
raise ValueError("Adresse invalide (ROM ou hyphen).")
sql = f"""
INSERT INTO {TABLE_NAME}
({COL_ADDRESS}, {COL_ADDR_HYPHEN}, {COL_LIEU}, {COL_REPERE}, {COL_MES}, {COL_RESBITS})
VALUES (%s, %s, %s, %s, %s, %s)
"""
with get_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (
addr_rom,
addr_hyp.lower(),
lieu,
(repere.strip() if repere and str(repere).strip() else None),
mise_en_service,
res_bits,
))
conn.commit()
return cur.lastrowid
def update_tracker(row_id: int, address: str, lieu: str, res_bits: int,
repere: str | None, mise_en_service: date | None,
address_hyphen: str | None = None) -> None:
addr_rom = (address or "").strip()
addr_hyp = (address_hyphen or "").strip() if address_hyphen else ""
if addr_rom and not addr_hyp:
addr_hyp = rom_to_hyphen(addr_rom)
if addr_hyp and not addr_rom:
addr_rom = hyphen_to_rom(addr_hyp)
if not is_valid_rom(addr_rom) or not is_valid_hyphen(addr_hyp):
raise ValueError("Adresse invalide (ROM ou hyphen).")
sql = f"""
UPDATE {TABLE_NAME}
SET {COL_ADDRESS}=%s, {COL_ADDR_HYPHEN}=%s, {COL_LIEU}=%s, {COL_REPERE}=%s, {COL_MES}=%s, {COL_RESBITS}=%s
WHERE {COL_ID}=%s
"""
with get_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (
addr_rom,
addr_hyp.lower(),
lieu,
(repere.strip() if repere and str(repere).strip() else None),
mise_en_service,
res_bits,
row_id,
))
conn.commit()
def delete_tracker(row_id: int) -> None:
sql = f"DELETE FROM {TABLE_NAME} WHERE {COL_ID}=%s"
with get_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (row_id,))
conn.commit()
# ==================
# Application Streamlit
# ==================
st.set_page_config(page_title="Tracker", page_icon="🌡️", layout="wide")
user = require_login()
st.title("🌡️ Gestion du parc sondes (stock ou installées)")
with st.expander("Paramètres de connexion (lecture seule)"):
st.write({k: ("***" if k in {"password"} else v) for k, v in DB_CFG.items()})
st.caption("Configurez ces valeurs via le fichier .env")
st.sidebar.header("Filtres & Actions")
st.sidebar.caption(f"Connecté en tant que **{user}**")
_all = fetch_trackers()
lieux = sorted([x for x in _all[COL_LIEU].dropna().unique()]) if not _all.empty else []
lieu_selected = st.sidebar.selectbox("Filtrer par lieu", options=["(Tous)"] + lieux, index=0)
# Formulaire d'ajout
st.sidebar.subheader("Ajouter une sonde")
with st.sidebar.form("add_form", clear_on_submit=True):
new_address = st.text_input("Adresse ROM", placeholder="{0x28,0xFF,...}", help=rom_help())
preview_h = rom_to_hyphen(new_address) if new_address else ""
st.text_input("Adresse hyphen (auto)", value=preview_h, disabled=True)
new_lieu = st.text_input("Lieu d'installation")
new_repere = st.text_input("Repère (optionnel)")
new_mes = st.date_input("Mise en service (optionnel)", value=None, format="YYYY-MM-DD")
new_res = st.selectbox("Résolution (bits)", options=[9,10,11,12])
submitted = st.form_submit_button("Ajouter")
if submitted:
if not is_valid_rom(new_address):
st.warning("Adresse ROM invalide.")
elif not new_lieu.strip():
st.warning("Lieu requis.")
else:
rid = insert_tracker(
new_address.strip(),
new_lieu.strip(),
int(new_res),
new_repere,
new_mes if isinstance(new_mes, date) else None,
address_hyphen=rom_to_hyphen(new_address.strip()),
)
st.success(f"Sonde ajoutée (id={rid}).")
time.sleep(0.6)
st.rerun()
st.sidebar.divider()
if st.sidebar.button("Se déconnecter"):
for _k in list(st.session_state.keys()):
st.session_state.pop(_k, None)
st.success("Déconnecté.")
time.sleep(0.3)
st.rerun()
# Vue principale
if lieu_selected != "(Tous)":
df = fetch_trackers(where_lieu=lieu_selected)
else:
df = _all.copy()
if df.empty:
st.info("Aucune sonde enregistrée.")
else:
df["resolution"] = df[COL_RESBITS].apply(res_label)
st.subheader("Enregistrements")
edited = st.data_editor(
df[[COL_ID, COL_ADDRESS, COL_ADDR_HYPHEN, COL_LIEU, COL_REPERE, COL_MES, COL_RESBITS, "resolution", COL_DATE]],
hide_index=True,
use_container_width=True,
num_rows="dynamic",
)
removed_ids = set(df[COL_ID]) - set(edited[COL_ID])
to_update = []
for _, row in edited.iterrows():
orig = df.loc[df[COL_ID] == row[COL_ID]].iloc[0]
changed = (
(row[COL_ADDRESS] != orig[COL_ADDRESS]) or
(row[COL_ADDR_HYPHEN] != orig[COL_ADDR_HYPHEN]) or
(row[COL_LIEU] != orig[COL_LIEU]) or
(row.get(COL_REPERE) or "") != (orig.get(COL_REPERE) or "") or
(str(row.get(COL_MES) or "")[:10] != str(orig.get(COL_MES) or "")[:10]) or
(int(row[COL_RESBITS]) != int(orig[COL_RESBITS]))
)
if changed:
mes_val = row.get(COL_MES)
if pd.isna(mes_val):
mes_val = None
elif hasattr(mes_val, "date"):
mes_val = mes_val.date()
to_update.append((int(row[COL_ID]), str(row[COL_ADDRESS]), str(row[COL_ADDR_HYPHEN] or ""),
str(row[COL_LIEU]), int(row[COL_RESBITS]),
str(row.get(COL_REPERE) or None), mes_val))
col1, col2 = st.columns([1,1])
if st.button("Enregistrer les modifications"):
for rid, addr_rom, addr_hyp, lieu, rbits, repere, mes in to_update:
update_tracker(rid, addr_rom, lieu, rbits, repere, mes, address_hyphen=addr_hyp)
for rid in removed_ids:
delete_tracker(rid)
st.success("Modifications enregistrées ✔️")
time.sleep(0.6)
st.rerun()
if st.button("Annuler"):
st.rerun()
st.divider()
st.caption("Astuce : collez une adresse ROM {0x..,...} → la version hyphen est générée automatiquement.")

298
Outils/visualiseur_logs.py Normal file
View File

@@ -0,0 +1,298 @@
# visualiseur_logs.py
# Dépendances: streamlit, paramiko
# pip install streamlit paramiko
import html
import time
from datetime import datetime
import streamlit as st
try:
import paramiko
except ImportError:
paramiko = None
# =========================
# CONFIG — À RENSEIGNER
# =========================
VPS_HOST = "162.19.78.131" # ← mets ton IP/DNS
VPS_PORT = 22 # ← port SSH
VPS_USER = "debian" # ← utilisateur
VPS_PASSWORD = "lpZwixbBUFtGY" # ← mot de passe
VPS_LOG_DIR = "/home/debian/Gestion_sondes/Logs" # ← dossier des logs sur le VPS
# =========================
# UI
# =========================
st.set_page_config(page_title="Visualiseur de Logs (VPS, password)", layout="wide")
st.title("🧾 Visualiseur de fichiers logs (VPS)")
st.caption(f"Cible : {VPS_USER}@{VPS_HOST}:{VPS_PORT} • Dossier logs : {VPS_LOG_DIR}")
if paramiko is None:
st.error("Paramiko nest pas installé. Exécute : pip install paramiko")
st.stop()
# Barre latérale : options daffichage & refresh
with st.sidebar:
st.header("⚙️ Options")
auto_refresh = st.toggle("🔄 Rafraîchissement auto", value=False, key="auto_refresh")
refresh_interval = st.slider("Intervalle (secondes)", 2, 60, 5, key="refresh_interval")
if st.button("Rafraîchir maintenant"):
st.rerun()
# =========================
# FONCTIONS SSH
# =========================
def ssh_connect_password(host, port, user, password):
"""Retourne un client SSH connecté (password)."""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=host, port=int(port), username=user, password=password, timeout=10)
return ssh
def list_logs_over_ssh(ssh, log_dir):
"""
Liste les *.log du dossier (sans sous-dossiers), triés par mtime desc.
Retourne [{'name': 'f.log', 'mtime': 1234567890.0, 'size': 1024}, ...]
"""
cmd = (
f"bash -lc \"find '{log_dir}' -maxdepth 1 -type f -name '*.log' "
f"-printf '%T@ %s %f\\n' 2>/dev/null | sort -nr\""
)
_, stdout, stderr = ssh.exec_command(cmd)
_ = stderr.read().decode(errors="ignore") # on ignore les warnings find
out = stdout.read().decode(errors="ignore")
items = []
for line in out.splitlines():
parts = line.strip().split(" ", 2)
if len(parts) < 3:
continue
try:
mtime = float(parts[0]); size = int(parts[1]); name = parts[2]
items.append({"name": name, "mtime": mtime, "size": size})
except Exception:
continue
return items
def read_tail_over_ssh(ssh, remote_path, n_lines):
"""Lit les N dernières lignes via 'tail -n' (rapide sur gros logs)."""
n = max(1, int(n_lines))
cmd = f"bash -lc \"tail -n {n} '{remote_path}'\""
_, stdout, stderr = ssh.exec_command(cmd)
err = stderr.read().decode(errors="ignore")
if "No such file" in err:
raise FileNotFoundError(err)
return stdout.read().decode(errors="ignore")
def backup_and_truncate_remote(ssh, remote_path):
"""Crée une sauvegarde horodatée .bak et tronque le fichier à 0 octet. Retourne le chemin .bak."""
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
bak = f"{remote_path}.{ts}.bak"
cmd = (
f"bash -lc \"cp '{remote_path}' '{bak}' 2>/dev/null || true; "
f"truncate -s 0 '{remote_path}' 2>/dev/null || : > '{remote_path}'\""
)
_, out, err = ssh.exec_command(cmd)
_ = out.read()
e = err.read().decode(errors="ignore").strip()
if "No such file" in e:
raise FileNotFoundError(e)
return bak
# =========================
# CONNEXION & LISTE
# =========================
if not all([VPS_HOST, VPS_USER, VPS_PASSWORD, VPS_LOG_DIR]):
st.error("Complète les constantes en haut du fichier (hôte/utilisateur/mot de passe/dossier logs).")
st.stop()
try:
ssh = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD)
except Exception as e:
st.error(f"❌ Connexion SSH échouée : {e}")
st.stop()
try:
logs = list_logs_over_ssh(ssh, VPS_LOG_DIR)
except Exception as e:
ssh.close()
st.error(f"❌ Impossible de lister les logs : {e}")
st.stop()
if not logs:
ssh.close()
st.warning("Aucun fichier *.log trouvé dans ce dossier sur le VPS.")
st.stop()
fichiers = [x["name"] for x in logs]
choix = st.selectbox("📂 Sélectionnez un fichier log :", fichiers, index=0)
log_info = next((x for x in logs if x["name"] == choix), logs[0])
log_path = f"{VPS_LOG_DIR.rstrip('/')}/{choix}"
mtime_dt = datetime.fromtimestamp(log_info["mtime"])
st.caption(f"`{choix}` — Taille : {log_info['size']:,} o — Modifié : {mtime_dt:%Y-%m-%d %H:%M:%S}")
# =========================
# OPTIONS & LECTURE
# =========================
col1, col2, col3 = st.columns([1, 1, 1.2])
with col1:
filtre_erreurs = st.checkbox(
"🔍 Afficher uniquement les erreurs",
value=False,
help="Filtre sur ERROR, ❌, Traceback, failed, exception, critical, fatal"
)
with col2:
nb_lignes = st.slider("📏 Lignes à afficher", 10, 5000, 300)
with col3:
highlight = st.checkbox(
"🖍️ Surligner erreurs/avertissements",
value=True,
help="Met en évidence ERROR/CRITICAL/EXCEPTION (rouge) et WARN (jaune)"
)
# Lecture (on prend une marge quand filtre actif)
try:
marge = 300 if filtre_erreurs else 0
content = read_tail_over_ssh(ssh, log_path, nb_lignes + marge)
lignes = content.splitlines(keepends=True)
except Exception as e:
ssh.close()
st.error(f"Impossible de lire le fichier : {e}")
st.stop()
# On peut fermer maintenant (les actions rouvriront une session propre)
ssh.close()
# Filtrage
if filtre_erreurs:
err_keys = ["error", "traceback", "failed", "exception", "critical", "fatal"]
lignes = [l for l in lignes if any(k in l.lower() for k in err_keys)]
dernieres = lignes[-nb_lignes:]
# Surlignage
def colorize(lines):
out = []
for l in lines:
low = l.lower()
style = (
"font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace;"
"white-space: pre-wrap; margin: 0; padding: 2px 6px; border-radius: 4px;"
)
bg = None
if any(k in low for k in ["error", "traceback", "failed", "exception", "critical", "fatal", ""]):
bg = "#ffe6e6" # rouge clair
elif "warn" in low:
bg = "#fff8e1" # jaune clair
if bg:
style += f"background:{bg};"
out.append(f"<div style='{style}'>{html.escape(l)}</div>")
return "\n".join(out)
if highlight:
st.markdown(colorize(dernieres), unsafe_allow_html=True)
else:
st.text_area("📄 Contenu du fichier log :", "".join(dernieres), height=600)
st.divider()
# =========================
# ACTIONS (Backup + Vidage)
# =========================
st.subheader("🧰 Actions sur ce fichier (VPS)")
colA, colB, colC = st.columns([1, 1, 2])
with colA:
faire_backup = st.checkbox("Créer une copie .bak avant vidage", value=True,
help="Copie horodatée à côté du fichier.")
with colB:
confirmation = st.checkbox("Je confirme vouloir vider", value=False)
with colC:
vider = st.button("🧹 Vider ce log", use_container_width=True)
if vider:
if not confirmation:
st.warning("❗ Coche dabord « Je confirme vouloir vider » pour éviter les erreurs.")
else:
try:
ssh2 = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD)
with ssh2:
if faire_backup:
bak_path = backup_and_truncate_remote(ssh2, log_path)
st.info(f"📦 Copie de sauvegarde créée : `{bak_path}`")
else:
cmd = f"bash -lc \"truncate -s 0 '{log_path}' 2>/dev/null || : > '{log_path}'\""
_, out, err = ssh2.exec_command(cmd)
_ = out.read(); _ = err.read()
st.success("✅ Fichier vidé avec succès.")
st.rerun()
except PermissionError:
st.error("⛔ Permission refusée. Vérifie les droits sur le fichier/dossier.")
except FileNotFoundError as e:
st.error(f"📁 Fichier/dossier introuvable : {e}")
except Exception as e:
st.error(f"❌ Échec du vidage : {e}")
# =========================
# PURGE DE PLUSIEURS LOGS
# =========================
st.subheader("🗑️ Purge de plusieurs logs (VPS)")
# On reliste tous les logs disponibles
try:
ssh3 = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD)
all_logs = list_logs_over_ssh(ssh3, VPS_LOG_DIR)
ssh3.close()
except Exception as e:
all_logs = []
st.error(f"Impossible de relister les logs : {e}")
if all_logs:
age_jours = st.number_input("Supprimer les fichiers plus vieux que (jours)", 1, 365, 7)
now = time.time()
candidats = [x for x in all_logs if (now - x["mtime"]) / 86400 >= age_jours]
if not candidats:
st.info("Aucun fichier ne correspond au filtre d'âge.")
else:
st.write(f"📂 {len(candidats)} fichier(s) plus vieux que {age_jours} jours trouvé(s).")
selection = []
for x in candidats:
label = f'{x["name"]}{x["size"]/1024:.1f} Ko — {datetime.fromtimestamp(x["mtime"]).strftime("%Y-%m-%d %H:%M")}'
if st.checkbox(label, key=f"purge_{x['name']}"):
selection.append(x["name"])
if selection:
st.warning(f"{len(selection)} fichier(s) sélectionné(s) pour suppression définitive.")
confirm = st.checkbox("Je confirme la suppression définitive", key="confirm_purge")
if st.button("❌ Supprimer les fichiers sélectionnés", type="primary", disabled=not confirm):
try:
ssh4 = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD)
for fname in selection:
remote_path = f"{VPS_LOG_DIR.rstrip('/')}/{fname}"
cmd = f"bash -lc \"rm -f '{remote_path}'\""
_, out, err = ssh4.exec_command(cmd)
_ = out.read(); _ = err.read()
ssh4.close()
st.success(f"{len(selection)} fichier(s) supprimé(s).")
st.rerun()
except Exception as e:
st.error(f"Erreur lors de la suppression : {e}")
else:
st.info("Aucun log trouvé pour la purge.")
# =========================
# AUTO-REFRESH (fin de script)
# =========================
if auto_refresh:
# Affiche une petite info et relance la page après X secondes
st.caption(f"🔄 Rafraîchissement auto activé — Prochaine mise à jour dans {refresh_interval} s…")
time.sleep(refresh_interval)
st.rerun()