diff --git a/.env b/.env index cc35abb..3ba74a9 100644 --- a/.env +++ b/.env @@ -1,14 +1,25 @@ -# OVH_SMS_SENDER=DOMO91FR #connexion mysql DB_HOST=162.19.78.131 DB_USER=sondes -DB_PASS='TX.)-U1!zq5Axdk4' +DB_PASS=TX.)-U1!zq5Axdk4 DB_NAME=Sondes +AUTH_USERS=[{"user":"Michel","pass":"210462"}] -# MQTT + +# MQTT Saclay MQTT_HOST=54.36.188.119 MQTT_USER=Bwps -MQTT_PASS='scJ5ACj2keRfI^' +MQTT_PASS=scJ5ACj2keRfI^ + +# --- MQTT Meudon --- +MQTT_HOST_MEUDON=162.19.78.131 +MQTT_USER_MEUDON=sondes +MQTT_PASS_MEUDON=3J@bjYP0 +MQTT_PORT_MEUDON=1883 + +# Topic gyrophare Meudon +GYRO_MQTT_TOPIC_MEUDON=Meudon/gyrophare + # Boucle rapide du gyro GYRO_MODE=mqtt @@ -28,6 +39,17 @@ ALERT_LOOKBACK_MINUTES=120 # Logs LOGLEVEL=INFO +# === Connexion SSH pour visualiseur_logs.py === +SSH_HOST=162.19.78.131 +SSH_PORT=22 +SSH_USER=debian +SSH_KEY_PATH=/home/debian/.ssh/id_ed25519 +SSH_KEY_PASSPHRASE='gaby' +SSH_LOG_DIR=/home/debian/Gestion_sondes/Logs +VPS_HOST=162.19.78.131 + + + # paramètres mail SMTP_HOST=ssl0.ovh.net SMTP_PORT=587 diff --git a/Outils/Injection_tests.py b/Outils/Injection_tests.py deleted file mode 100644 index 2edc299..0000000 --- a/Outils/Injection_tests.py +++ /dev/null @@ -1,236 +0,0 @@ -import os -import datetime as dt -import pandas as pd -import streamlit as st -from dotenv import load_dotenv -import mysql.connector as mc - -# ---------------------- -# Config de la page -# ---------------------- -st.set_page_config(page_title="Injection de données de test", page_icon="🧪", layout="centered") -st.title("🧪 Injecteur de relevés de test (Sondes)") -st.caption("Crée ~10 lignes au-dessus d'un seuil pour tester les alertes") - -# ---------------------- -# Connexion MySQL depuis .env -# ---------------------- -@st.cache_resource(show_spinner=False) -def get_connection(): - load_dotenv() - try: - cnx = mc.connect( - host=os.getenv("DB_HOST"), - user=os.getenv("DB_USER"), - password=os.getenv("DB_PASS"), - database=os.getenv("DB_NAME"), - autocommit=True, - ) - return cnx - except Exception as e: - st.error(f"Échec de connexion MySQL : {e}") - raise - -# ---------------------- -# Helpers : liste des sondes actives et hors entretien -# ---------------------- -@st.cache_data(ttl=60, show_spinner=False) -def list_sondes(site: str) -> list: - """Retourne la liste des sondes actives (Etat=ON) et non en entretien pour le site. - Essaie d'abord monitor_{site}, puis Chambres_froides, sinon fallback via la table de mesures. - """ - cnx = get_connection() - cur = cnx.cursor() - # 1) monitor_{site} - try: - cur.execute( - f""" - SELECT Sonde - FROM `monitor_{site}` - WHERE (Etat='ON' OR Etat=1) - AND ( (Maintenance='OFF') OR (Maintenance=0) OR (Maintenance IS NULL) ) - ORDER BY Sonde - """ - ) - rows = [r[0] for r in cur.fetchall()] - if rows: - return rows - except Exception: - pass - # 2) Chambres_froides - try: - cur.execute( - """ - SELECT Sonde - FROM `Chambres_froides` - WHERE Lieu=%s AND (Etat='ON' OR Etat=1) - AND ( (Maintenance='OFF') OR (Maintenance=0) OR (Maintenance IS NULL) ) - ORDER BY Sonde - """, - (site,) - ) - rows = [r[0] for r in cur.fetchall()] - if rows: - return rows - except Exception: - pass - # 3) Fallback : dernier état via table de mesures (distinct) - try: - cur.execute( - f"SELECT DISTINCT Sonde FROM `{site}` ORDER BY Sonde LIMIT 200" - ) - return [r[0] for r in cur.fetchall()] - except Exception: - return [] - -# ---------------------- -# Helper : récupérer Temp_Max pour une sonde donnée -# ---------------------- -@st.cache_data(ttl=60, show_spinner=False) -def get_temp_max(site: str, sonde: str): - """Retourne Temp_Max pour (site, sonde) en cherchant d'abord dans monitor_{site}, puis Chambres_froides. - Renvoie None si non trouvé.""" - try: - cnx = get_connection() - cur = cnx.cursor() - # 1) monitor_{site} - try: - cur.execute( - f"SELECT Temp_Max FROM `monitor_{site}` WHERE Sonde=%s LIMIT 1", - (sonde,) - ) - row = cur.fetchone() - if row and row[0] is not None: - return float(row[0]) - except Exception: - pass - # 2) Chambres_froides - try: - cur.execute( - """ - SELECT Temp_Max - FROM `Chambres_froides` - WHERE Lieu=%s AND Sonde=%s - LIMIT 1 - """, - (site, sonde) - ) - row = cur.fetchone() - if row and row[0] is not None: - return float(row[0]) - except Exception: - pass - except Exception: - return None - return None - -# ---------------------- -# UI paramètres -# ---------------------- -with st.sidebar: - st.header("Paramètres") - site = st.selectbox("Site (table)", ["Saclay", "Meudon"], index=0) - - # Sélecteur de sonde depuis la liste active / hors entretien - options_sondes = list_sondes(site) - if not options_sondes: - st.warning("Aucune sonde active trouvée (ou table monitor introuvable). Vous pouvez saisir un nom manuel.") - sonde = st.text_input("Nom de la sonde", value="TEST_Chambre1") - else: - sonde = st.selectbox("Sonde (actives, hors entretien)", options_sondes) - - st.subheader("Température") - # Auto-remplissage du seuil depuis la base et verrouillage par défaut - _temp_db = get_temp_max(site, sonde) - if _temp_db is None: - st.warning("Temp_Max introuvable en base ; valeur par défaut 6.0°C.") - _temp_db = 6.0 - allow_edit = st.checkbox("Autoriser la modification du seuil", value=False) - temp_max = st.number_input("Seuil (Temp_Max)", value=float(_temp_db), step=0.1, disabled=not allow_edit) - - delta = st.number_input("Delta au-dessus du seuil", value=1.0, step=0.1) - absolute_override = st.checkbox("Définir une température absolue à la place") - absolute_temp = st.number_input( - "Température absolue (si coché)", value=12.5, step=0.1, disabled=not absolute_override - ) - - st.subheader("Série temporelle") - rows = st.number_input("Nombre de points", min_value=1, max_value=200, value=10, step=1) - step_min = st.number_input("Pas (minutes)", min_value=1, max_value=120, value=5, step=1) - start_offset = st.number_input("Début : il y a (minutes)", min_value=0, max_value=1440, value=45, step=5) - - st.markdown("---") - st.caption("Nettoyage rapide") - cleanup_scope = st.selectbox("Supprimer", ["Cette sonde", "Toutes les TEST_ des dernières 24h"]) - do_cleanup = st.button("🧹 Supprimer les données de test") -col1, col2 = st.columns(2) -# ---------------------- -# Actions -# ---------------------- -if col1.button("🚀 Injecter les données"): - try: - cnx = get_connection() - cur = cnx.cursor() - - # Calcul des timestamps et de la valeur - now = dt.datetime.now() - t0 = now - dt.timedelta(minutes=int(start_offset)) - if absolute_override: - value = float(absolute_temp) - else: - value = float(temp_max) + float(delta) - - # Préparation batch INSERT - sql = f"INSERT INTO `{site}` (Sonde, Temperature, Date) VALUES (%s,%s,%s)" - batch = [] - for i in range(int(rows)): - ts = t0 + dt.timedelta(minutes=i * int(step_min)) - batch.append((sonde, value, ts.strftime("%Y-%m-%d %H:%M:%S"))) - - cur.executemany(sql, batch) - st.success(f"{len(batch)} lignes insérées dans `{site}` pour **{sonde}** à **{value}°C**.") - - # Aperçu des données insérées - cur.execute( - f""" - SELECT Id, Sonde, Temperature, Date - FROM `{site}` - WHERE Sonde = %s AND Date >= %s AND Date <= %s - ORDER BY Date DESC - LIMIT 50 - """, - ( - sonde, - (t0 - dt.timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S"), - (t0 + dt.timedelta(minutes=int(rows)*int(step_min) + 1)).strftime("%Y-%m-%d %H:%M:%S"), - ), - ) - rows_preview = cur.fetchall() - if rows_preview: - df = pd.DataFrame(rows_preview, columns=["Id", "Sonde", "Temperature", "Date"]) - st.dataframe(df, use_container_width=True, hide_index=True) - else: - st.info("Aucune ligne trouvée pour l'aperçu (vérifiez les filtres/horaires).") - - except Exception as e: - st.error(f"Erreur lors de l'injection : {e}") - -# Nettoyage -def cleanup(): - cnx = get_connection() - cur = cnx.cursor() - if cleanup_scope == "Cette sonde": - cur.execute(f"DELETE FROM `{site}` WHERE Sonde = %s", (sonde,)) - st.success(f"Données supprimées pour la sonde **{sonde}** dans `{site}`.") - else: - cur.execute( - f"DELETE FROM `{site}` WHERE Sonde LIKE 'TEST\_%' ESCAPE '\\' AND Date >= NOW() - INTERVAL 1 DAY" - ) - st.success(f"Toutes les sondes **TEST_** des dernières 24h supprimées dans `{site}`.") - -if do_cleanup: - try: - cleanup() - except Exception as e: - st.error(f"Erreur de nettoyage : {e}") - diff --git a/Outils/visualiseur_logs.py b/Outils/visualiseur_logs.py deleted file mode 100644 index 9e9e3d3..0000000 --- a/Outils/visualiseur_logs.py +++ /dev/null @@ -1,298 +0,0 @@ -# visualiseur_logs.py -# Dépendances: streamlit, paramiko -# pip install streamlit paramiko - -import html -import time -from datetime import datetime -import streamlit as st - -try: - import paramiko -except ImportError: - paramiko = None - - -# ========================= -# CONFIG — À RENSEIGNER -# ========================= -VPS_HOST = "162.19.78.131" # ← mets ton IP/DNS -VPS_PORT = 22 # ← port SSH -VPS_USER = "debian" # ← utilisateur -VPS_PASSWORD = "lpZwixbBUFtGY" # ← mot de passe -VPS_LOG_DIR = "/home/debian/Gestion_sondes/Logs" # ← dossier des logs sur le VPS - - -# ========================= -# UI -# ========================= -st.set_page_config(page_title="Visualiseur de Logs (VPS, password)", layout="wide") -st.title("🧾 Visualiseur de fichiers logs (VPS)") -st.caption(f"Cible : {VPS_USER}@{VPS_HOST}:{VPS_PORT} • Dossier logs : {VPS_LOG_DIR}") - -if paramiko is None: - st.error("Paramiko n’est pas installé. Exécute : pip install paramiko") - st.stop() - -# Barre latérale : options d’affichage & refresh -with st.sidebar: - st.header("⚙️ Options") - auto_refresh = st.toggle("🔄 Rafraîchissement auto", value=False, key="auto_refresh") - refresh_interval = st.slider("Intervalle (secondes)", 2, 60, 5, key="refresh_interval") - if st.button("Rafraîchir maintenant"): - st.rerun() - - -# ========================= -# FONCTIONS SSH -# ========================= -def ssh_connect_password(host, port, user, password): - """Retourne un client SSH connecté (password).""" - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(hostname=host, port=int(port), username=user, password=password, timeout=10) - return ssh - -def list_logs_over_ssh(ssh, log_dir): - """ - Liste les *.log du dossier (sans sous-dossiers), triés par mtime desc. - Retourne [{'name': 'f.log', 'mtime': 1234567890.0, 'size': 1024}, ...] - """ - cmd = ( - f"bash -lc \"find '{log_dir}' -maxdepth 1 -type f -name '*.log' " - f"-printf '%T@ %s %f\\n' 2>/dev/null | sort -nr\"" - ) - _, stdout, stderr = ssh.exec_command(cmd) - _ = stderr.read().decode(errors="ignore") # on ignore les warnings find - out = stdout.read().decode(errors="ignore") - - items = [] - for line in out.splitlines(): - parts = line.strip().split(" ", 2) - if len(parts) < 3: - continue - try: - mtime = float(parts[0]); size = int(parts[1]); name = parts[2] - items.append({"name": name, "mtime": mtime, "size": size}) - except Exception: - continue - return items - -def read_tail_over_ssh(ssh, remote_path, n_lines): - """Lit les N dernières lignes via 'tail -n' (rapide sur gros logs).""" - n = max(1, int(n_lines)) - cmd = f"bash -lc \"tail -n {n} '{remote_path}'\"" - _, stdout, stderr = ssh.exec_command(cmd) - err = stderr.read().decode(errors="ignore") - if "No such file" in err: - raise FileNotFoundError(err) - return stdout.read().decode(errors="ignore") - -def backup_and_truncate_remote(ssh, remote_path): - """Crée une sauvegarde horodatée .bak et tronque le fichier à 0 octet. Retourne le chemin .bak.""" - ts = datetime.now().strftime("%Y%m%d-%H%M%S") - bak = f"{remote_path}.{ts}.bak" - cmd = ( - f"bash -lc \"cp '{remote_path}' '{bak}' 2>/dev/null || true; " - f"truncate -s 0 '{remote_path}' 2>/dev/null || : > '{remote_path}'\"" - ) - _, out, err = ssh.exec_command(cmd) - _ = out.read() - e = err.read().decode(errors="ignore").strip() - if "No such file" in e: - raise FileNotFoundError(e) - return bak - - -# ========================= -# CONNEXION & LISTE -# ========================= -if not all([VPS_HOST, VPS_USER, VPS_PASSWORD, VPS_LOG_DIR]): - st.error("Complète les constantes en haut du fichier (hôte/utilisateur/mot de passe/dossier logs).") - st.stop() - -try: - ssh = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD) -except Exception as e: - st.error(f"❌ Connexion SSH échouée : {e}") - st.stop() - -try: - logs = list_logs_over_ssh(ssh, VPS_LOG_DIR) -except Exception as e: - ssh.close() - st.error(f"❌ Impossible de lister les logs : {e}") - st.stop() - -if not logs: - ssh.close() - st.warning("Aucun fichier *.log trouvé dans ce dossier sur le VPS.") - st.stop() - -fichiers = [x["name"] for x in logs] -choix = st.selectbox("📂 Sélectionnez un fichier log :", fichiers, index=0) -log_info = next((x for x in logs if x["name"] == choix), logs[0]) -log_path = f"{VPS_LOG_DIR.rstrip('/')}/{choix}" - -mtime_dt = datetime.fromtimestamp(log_info["mtime"]) -st.caption(f"`{choix}` — Taille : {log_info['size']:,} o — Modifié : {mtime_dt:%Y-%m-%d %H:%M:%S}") - -# ========================= -# OPTIONS & LECTURE -# ========================= -col1, col2, col3 = st.columns([1, 1, 1.2]) -with col1: - filtre_erreurs = st.checkbox( - "🔍 Afficher uniquement les erreurs", - value=False, - help="Filtre sur ERROR, ❌, Traceback, failed, exception, critical, fatal" - ) -with col2: - nb_lignes = st.slider("📏 Lignes à afficher", 10, 5000, 30) -with col3: - highlight = st.checkbox( - "🖍️ Surligner erreurs/avertissements", - value=True, - help="Met en évidence ERROR/CRITICAL/EXCEPTION (rouge) et WARN (jaune)" - ) - -# Lecture (on prend une marge quand filtre actif) -try: - marge = 300 if filtre_erreurs else 0 - content = read_tail_over_ssh(ssh, log_path, nb_lignes + marge) - lignes = content.splitlines(keepends=True) -except Exception as e: - ssh.close() - st.error(f"Impossible de lire le fichier : {e}") - st.stop() - -# On peut fermer maintenant (les actions rouvriront une session propre) -ssh.close() - -# Filtrage -if filtre_erreurs: - err_keys = ["error", "traceback", "failed", "exception", "critical", "fatal"] - lignes = [l for l in lignes if any(k in l.lower() for k in err_keys)] - -dernieres = lignes[-nb_lignes:] - -# Surlignage -def colorize(lines): - out = [] - for l in lines: - low = l.lower() - style = ( - "font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace;" - "white-space: pre-wrap; margin: 0; padding: 2px 6px; border-radius: 4px;" - ) - bg = None - if any(k in low for k in ["error", "traceback", "failed", "exception", "critical", "fatal", "❌"]): - bg = "#ffe6e6" # rouge clair - elif "warn" in low: - bg = "#fff8e1" # jaune clair - if bg: - style += f"background:{bg};" - out.append(f"
{html.escape(l)}
") - return "\n".join(out) - -if highlight: - st.markdown(colorize(dernieres), unsafe_allow_html=True) -else: - st.text_area("📄 Contenu du fichier log :", "".join(dernieres), height=600) - -st.divider() - -# ========================= -# ACTIONS (Backup + Vidage) -# ========================= -st.subheader("🧰 Actions sur ce fichier (VPS)") -colA, colB, colC = st.columns([1, 1, 2]) -with colA: - faire_backup = st.checkbox("Créer une copie .bak avant vidage", value=True, - help="Copie horodatée à côté du fichier.") -with colB: - confirmation = st.checkbox("Je confirme vouloir vider", value=False) -with colC: - vider = st.button("🧹 Vider ce log", use_container_width=True) - -if vider: - if not confirmation: - st.warning("❗ Coche d’abord « Je confirme vouloir vider » pour éviter les erreurs.") - else: - try: - ssh2 = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD) - with ssh2: - if faire_backup: - bak_path = backup_and_truncate_remote(ssh2, log_path) - st.info(f"📦 Copie de sauvegarde créée : `{bak_path}`") - else: - cmd = f"bash -lc \"truncate -s 0 '{log_path}' 2>/dev/null || : > '{log_path}'\"" - _, out, err = ssh2.exec_command(cmd) - _ = out.read(); _ = err.read() - - st.success("✅ Fichier vidé avec succès.") - st.rerun() - except PermissionError: - st.error("⛔ Permission refusée. Vérifie les droits sur le fichier/dossier.") - except FileNotFoundError as e: - st.error(f"📁 Fichier/dossier introuvable : {e}") - except Exception as e: - st.error(f"❌ Échec du vidage : {e}") - -# ========================= -# PURGE DE PLUSIEURS LOGS -# ========================= -st.subheader("🗑️ Purge de plusieurs logs (VPS)") - -# On reliste tous les logs disponibles -try: - ssh3 = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD) - all_logs = list_logs_over_ssh(ssh3, VPS_LOG_DIR) - ssh3.close() -except Exception as e: - all_logs = [] - st.error(f"Impossible de relister les logs : {e}") - -if all_logs: - age_jours = st.number_input("Supprimer les fichiers plus vieux que (jours)", 1, 365, 7) - now = time.time() - candidats = [x for x in all_logs if (now - x["mtime"]) / 86400 >= age_jours] - - if not candidats: - st.info("Aucun fichier ne correspond au filtre d'âge.") - else: - st.write(f"📂 {len(candidats)} fichier(s) plus vieux que {age_jours} jours trouvé(s).") - - selection = [] - for x in candidats: - label = f'{x["name"]} — {x["size"]/1024:.1f} Ko — {datetime.fromtimestamp(x["mtime"]).strftime("%Y-%m-%d %H:%M")}' - if st.checkbox(label, key=f"purge_{x['name']}"): - selection.append(x["name"]) - - if selection: - st.warning(f"{len(selection)} fichier(s) sélectionné(s) pour suppression définitive.") - confirm = st.checkbox("Je confirme la suppression définitive", key="confirm_purge") - if st.button("❌ Supprimer les fichiers sélectionnés", type="primary", disabled=not confirm): - try: - ssh4 = ssh_connect_password(VPS_HOST, VPS_PORT, VPS_USER, VPS_PASSWORD) - for fname in selection: - remote_path = f"{VPS_LOG_DIR.rstrip('/')}/{fname}" - cmd = f"bash -lc \"rm -f '{remote_path}'\"" - _, out, err = ssh4.exec_command(cmd) - _ = out.read(); _ = err.read() - ssh4.close() - st.success(f"✅ {len(selection)} fichier(s) supprimé(s).") - st.rerun() - except Exception as e: - st.error(f"Erreur lors de la suppression : {e}") -else: - st.info("Aucun log trouvé pour la purge.") - -# ========================= -# AUTO-REFRESH (fin de script) -# ========================= -if auto_refresh: - # Affiche une petite info et relance la page après X secondes - st.caption(f"🔄 Rafraîchissement auto activé — Prochaine mise à jour dans {refresh_interval} s…") - time.sleep(refresh_interval) - st.rerun() diff --git a/app/Monitor_Meudon.py b/app/Monitor_Meudon.py index f6423f8..fe36365 100644 --- a/app/Monitor_Meudon.py +++ b/app/Monitor_Meudon.py @@ -5,12 +5,19 @@ SITE = "Meudon" PROGRAM_NAME = f"Monitor_{SITE}" -# ========= Imports & .env ========= -import os, re, time, ssl, smtplib, logging import datetime as dt -from email.message import EmailMessage +# ========= Imports & .env ========= +import logging +import os +import re +import smtplib +import ssl +import time from datetime import datetime +from email.message import EmailMessage + from dotenv import load_dotenv, find_dotenv + load_dotenv(find_dotenv(usecwd=True), override=False) from utils_sms import normaliser_sms diff --git a/app/Mqtt_meudon.py b/app/Mqtt_meudon.py new file mode 100644 index 0000000..d25b97e --- /dev/null +++ b/app/Mqtt_meudon.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +""" +Mqtt_meudon.py +Récupère les mesures MQTT du site Meudon et les insère dans la table Sondes.Meudon. +""" + +import os +import logging +from logging.handlers import RotatingFileHandler + +import mysql.connector +from mysql.connector import Error + +import paho.mqtt.client as mqtt +from paho.mqtt.client import CallbackAPIVersion + +from dotenv import load_dotenv + +# ========================= +# Chargement du .env +# ========================= +load_dotenv() + +# --- MySQL (commun) --- +DB_HOST = os.getenv("DB_HOST") +DB_USER = os.getenv("DB_USER") +DB_PASS = os.getenv("DB_PASS") +DB_NAME = os.getenv("DB_NAME") + +# --- MQTT Meudon --- +MQTT_HOST = os.getenv("MQTT_HOST_MEUDON", os.getenv("MQTT_HOST")) +MQTT_USER = os.getenv("MQTT_USER_MEUDON", os.getenv("MQTT_USER")) +MQTT_PASS = os.getenv("MQTT_PASS_MEUDON", os.getenv("MQTT_PASS")) +MQTT_PORT = int(os.getenv("MQTT_PORT_MEUDON", os.getenv("MQTT_PORT", 1883))) + +GYRO_TOPIC_MEUDON = os.getenv("GYRO_MQTT_TOPIC_MEUDON", "Meudon/gyrophare") + +# Nom de la table de destination +TABLE_NAME = "Meudon" + +# ========================= +# Logging +# ========================= +def setup_logging(): + logger = logging.getLogger() + logger.setLevel(logging.INFO) + + formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + + # Console + console = logging.StreamHandler() + console.setFormatter(formatter) + logger.addHandler(console) + + # Logs fichier (même logique que Saclay) + log_dir = os.getenv("LOG_DIR", "./Logs") + try: + os.makedirs(log_dir, exist_ok=True) + file_handler = RotatingFileHandler( + os.path.join(log_dir, "Mqtt_meudon.log"), + maxBytes=1_000_000, + backupCount=5, + encoding="utf-8", + ) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + except Exception as e: + logging.warning("Impossible de créer le fichier de log : %s", e) + + +# ========================= +# Accès MySQL +# ========================= +def insert_temperature(sonde: str, temperature: float) -> None: + """ + Insère une mesure dans la table Sondes.Meudon. + La colonne Date utilise CURRENT_TIMESTAMP par défaut dans MySQL. + """ + try: + conn = mysql.connector.connect( + host=DB_HOST, + user=DB_USER, + password=DB_PASS, + database=DB_NAME, + ) + + cursor = conn.cursor() + sql = f"INSERT INTO {TABLE_NAME} (Sonde, Temperature) VALUES (%s, %s)" + cursor.execute(sql, (sonde, temperature)) + conn.commit() + + logging.info("Insertion OK (Meudon) -> %s = %.2f", sonde, temperature) + + except Error as e: + logging.exception("Erreur MySQL (Meudon) pour la sonde %s : %s", sonde, e) + + finally: + try: + if cursor: + cursor.close() + if conn and conn.is_connected(): + conn.close() + except Exception: + pass + + +# ========================= +# Callbacks MQTT (API v2) +# ========================= +def on_connect(client, userdata, flags, reason_code, properties=None): + if reason_code == 0: + logging.info("Connecté au broker MQTT Meudon (%s)", MQTT_HOST) + client.subscribe("Meudon/mod02/#") + logging.info("Abonné au topic : Meudon/#") + else: + logging.error("Échec de connexion MQTT (Meudon), code retour = %s", reason_code) + + +def on_message(client, userdata, msg: mqtt.MQTTMessage): + topic = msg.topic + payload_raw = msg.payload.decode("utf-8", errors="ignore").strip() + + logging.debug("Msg reçu (Meudon) : topic=%s payload=%s", topic, payload_raw) + + # On ignore le gyrophare + if topic == GYRO_TOPIC_MEUDON: + logging.debug("Topic gyrophare Meudon ignoré : %s", topic) + return + + # Nom de la sonde = dernier segment du topic + sonde = topic.split("/")[-1] if "/" in topic else topic + + # Conversion du payload en float + try: + value = float(payload_raw.replace(",", ".")) + except ValueError: + logging.warning( + "Payload non numérique (Meudon), mesure ignorée (topic=%s, payload=%s)", + topic, + payload_raw, + ) + return + + insert_temperature(sonde, value) + + +# ========================= +# Programme principal +# ========================= +def main(): + setup_logging() + logging.info("Démarrage du script Mqtt_meudon") + + # Vérif minimale des variables d'env + for var in ["DB_HOST", "DB_USER", "DB_PASS", "DB_NAME"]: + if os.getenv(var) in (None, ""): + logging.error("Variable d'environnement %s manquante !", var) + + client = mqtt.Client( + client_id="Mqtt_meudon_client", + callback_api_version=CallbackAPIVersion.VERSION2 + ) + client.username_pw_set(MQTT_USER, MQTT_PASS) + + client.on_connect = on_connect + client.on_message = on_message + + try: + client.connect(MQTT_HOST, MQTT_PORT, keepalive=60) + except Exception as e: + logging.exception("Impossible de se connecter au broker MQTT Meudon : %s", e) + return + + logging.info("Boucle MQTT Meudon en cours (Ctrl+C pour arrêter)...") + try: + client.loop_forever() + except KeyboardInterrupt: + logging.info("Arrêt demandé par l'utilisateur (Meudon).") + finally: + client.disconnect() + logging.info("Client MQTT Meudon déconnecté.") + + +if __name__ == "__main__": + main() diff --git a/app/Mqtt_saclay.py b/app/Mqtt_saclay.py new file mode 100644 index 0000000..0b30c5a --- /dev/null +++ b/app/Mqtt_saclay.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +""" +Mqtt_saclay.py +Récupère les mesures MQTT du site Saclay et les insère dans la table Sondes.Saclay. +""" + +import os +import logging +from logging.handlers import RotatingFileHandler + +import mysql.connector +from mysql.connector import Error + +import paho.mqtt.client as mqtt +from paho.mqtt.client import CallbackAPIVersion + +from dotenv import load_dotenv + +# ========================= +# Chargement du .env +# ========================= +load_dotenv() + +DB_HOST = os.getenv("DB_HOST") +DB_USER = os.getenv("DB_USER") +DB_PASS = os.getenv("DB_PASS") +DB_NAME = os.getenv("DB_NAME") + +MQTT_HOST = os.getenv("MQTT_HOST") +MQTT_USER = os.getenv("MQTT_USER") +MQTT_PASS = os.getenv("MQTT_PASS") +MQTT_PORT = int(os.getenv("MQTT_PORT", 1883)) + +GYRO_TOPIC_SACLAY = os.getenv("GYRO_MQTT_TOPIC_SACLAY", "Saclay/gyrophare") +TABLE_NAME = "Saclay" + +# ========================= +# Logging +# ========================= +def setup_logging(): + logger = logging.getLogger() + logger.setLevel(logging.INFO) + + formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + + # Console + console = logging.StreamHandler() + console.setFormatter(formatter) + logger.addHandler(console) + + # Logs fichier + log_dir = os.getenv("LOG_DIR", "./Logs") + try: + os.makedirs(log_dir, exist_ok=True) + file_handler = RotatingFileHandler( + os.path.join(log_dir, "Mqtt_saclay.log"), + maxBytes=1_000_000, + backupCount=5, + encoding="utf-8", + ) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + except Exception as e: + logging.warning("Impossible de créer le fichier de log : %s", e) + + +# ========================= +# Accès MySQL +# ========================= +def insert_temperature(sonde: str, temperature: float) -> None: + try: + conn = mysql.connector.connect( + host=DB_HOST, + user=DB_USER, + password=DB_PASS, + database=DB_NAME, + ) + + cursor = conn.cursor() + sql = f"INSERT INTO {TABLE_NAME} (Sonde, Temperature) VALUES (%s, %s)" + cursor.execute(sql, (sonde, temperature)) + conn.commit() + + logging.info("Insertion OK -> %s = %.2f", sonde, temperature) + + except Error as e: + logging.exception("Erreur MySQL pour la sonde %s : %s", sonde, e) + + finally: + try: + if cursor: + cursor.close() + if conn and conn.is_connected(): + conn.close() + except Exception: + pass + + +# ========================= +# Callbacks MQTT (API v2) +# ========================= +def on_connect(client, userdata, flags, reason_code, properties=None): + if reason_code == 0: + logging.info("Connecté au broker MQTT (%s)", MQTT_HOST) + client.subscribe("Saclay/#") + logging.info("Abonné au topic : Saclay/#") + else: + logging.error("Échec de connexion MQTT, code retour = %s", reason_code) + + +def on_message(client, userdata, msg: mqtt.MQTTMessage): + topic = msg.topic + payload_raw = msg.payload.decode("utf-8", errors="ignore").strip() + + logging.debug("Msg reçu : topic=%s payload=%s", topic, payload_raw) + + if topic == GYRO_TOPIC_SACLAY: + return # on ignore le gyrophare + + sonde = topic.split("/")[-1] if "/" in topic else topic + + try: + value = float(payload_raw.replace(",", ".")) + except ValueError: + logging.warning("Payload non numérique (topic=%s payload=%s)", topic, payload_raw) + return + + insert_temperature(sonde, value) + + +# ========================= +# Programme principal +# ========================= +def main(): + setup_logging() + logging.info("Démarrage du script Mqtt_saclay") + + client = mqtt.Client( + client_id="Mqtt_saclay_client", + callback_api_version=CallbackAPIVersion.VERSION2 + ) + client.username_pw_set(MQTT_USER, MQTT_PASS) + + client.on_connect = on_connect + client.on_message = on_message + + try: + client.connect(MQTT_HOST, MQTT_PORT, keepalive=60) + except Exception as e: + logging.exception("Impossible de se connecter au broker MQTT : %s", e) + return + + logging.info("Boucle MQTT en cours (Ctrl+C pour arrêter)...") + try: + client.loop_forever() + except KeyboardInterrupt: + logging.info("Arrêt demandé par l'utilisateur.") + finally: + client.disconnect() + logging.info("Déconnexion MQTT.") + + +if __name__ == "__main__": + main() diff --git a/app/logger_config.py b/app/logger_config.py deleted file mode 100644 index abe0245..0000000 --- a/app/logger_config.py +++ /dev/null @@ -1,19 +0,0 @@ -import os, logging - -def setup_logger(filename: str): - # si l'argument est un chemin absolu, on le respecte - if os.path.isabs(filename): - log_path = filename - else: - # ancien comportement (ex: /var/log/Cuisine_Saclay/) - base = "/var/log/Cuisine_Saclay" - os.makedirs(base, exist_ok=True) - log_path = os.path.join(base, filename) - - os.makedirs(os.path.dirname(log_path), exist_ok=True) - logging.basicConfig( - level=logging.INFO, - filename=log_path, - format="%(asctime)s %(levelname)s %(message)s" - ) - diff --git a/app/mqtt_logger.py b/app/mqtt_logger.py deleted file mode 100644 index 9ffb9de..0000000 --- a/app/mqtt_logger.py +++ /dev/null @@ -1,48 +0,0 @@ -import argparse -import paho.mqtt.client as mqtt_client -from dotenv import load_dotenv -import logging -from logger_config import setup_logger -from utils_db import connect_to_mysql -from functools import partial - -def on_message(table_sql, _client, _userdata, msg): - try: - logging.info(f"Message reçu sur {msg.topic}: {msg.payload.decode()}") - cursor = mydb.cursor() - sonde_name = '/'.join(msg.topic.split('/')[1:]) - sql = f"INSERT INTO {table_sql} (Sonde, Temperature) VALUES (%s, %s)" - val = (sonde_name, msg.payload.decode()) - cursor.execute(sql, val) - mydb.commit() - logging.info(f"Insertion réussie : {val}") - except Exception as e: - logging.error(f"Erreur lors de l'insertion du message : {e}") - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--log", required=True, help="Nom du fichier de log") - parser.add_argument("--table", required=True, help="Nom complet de la table SQL") - parser.add_argument("--topic", required=True, help="Topic MQTT à écouter") - args = parser.parse_args() - - # 📋 Initialiser le logger - setup_logger(args.log) - - # 🔑 Charger les variables d'environnement - load_dotenv() - - # 🔌 Connexion MySQL - mydb = connect_to_mysql() - - # 📡 Connexion MQTT - try: - client = mqtt_client.Client() - client.username_pw_set("Bwps", "scJ5ACj2keRfI^") - client.on_message = partial(on_message, args.table) - client.connect("54.36.188.119", 1883, 60) - client.subscribe(args.topic) - logging.info(f"Connexion MQTT réussie et abonnement au topic '{args.topic}'.") - client.loop_forever() - except Exception as err: - logging.error(f"Erreur MQTT : {err}") diff --git a/app/mqtt_watchdog.py b/app/mqtt_watchdog.py new file mode 100644 index 0000000..ad23b23 --- /dev/null +++ b/app/mqtt_watchdog.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import argparse +import logging +import os +import sys +import time +from datetime import datetime, timedelta, timezone +import smtplib +from email.mime.text import MIMEText +from email.utils import formatdate +import threading + +import paho.mqtt.client as mqtt +from dotenv import load_dotenv; load_dotenv() + +# ---------- Configuration par défaut ---------- +DEFAULT_BROKER_HOST = os.getenv("MQTT_HOST", "54.36.188.119") +DEFAULT_BROKER_PORT = int(os.getenv("MQTT_PORT", "1883")) +DEFAULT_MQTT_USER = os.getenv("MQTT_USER", "Bwps") +DEFAULT_MQTT_PASS = os.getenv("MQTT_PASS", "scJ5ACj2keRfI^") + +# Email (OVH SMTP par ex.) +SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net") +SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # 465=SSL, 587=STARTTLS +SMTP_USER = os.getenv("SMTP_USER", "") +SMTP_PASS = os.getenv("SMTP_PASS", "") +MAIL_FROM = os.getenv("MAIL_FROM", SMTP_USER or "alerte@exemple.fr") +MAIL_TO = os.getenv("MAIL_TO", "") # "toi@domaine.fr,ops@domaine.fr" + +# Webhook SMS optionnel (ex: Free Mobile / OVH / autre) +WEBHOOK_SMS_URL = os.getenv("WEBHOOK_SMS_URL", "") # ex: https://smsapi.free-mobile.fr/sendmsg?user=XXX&pass=YYY&msg= + +# ---------- Helpers ---------- +def setup_logger(logfile: str | None): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + handlers=[logging.StreamHandler(sys.stdout)] if not logfile else [ + logging.FileHandler(logfile), + logging.StreamHandler(sys.stdout) + ], + ) + +def now_utc(): + return datetime.now(timezone.utc) + +def fmt_local(dt: datetime): + # Affichage lisible en Europe/Paris + try: + import zoneinfo + tz = zoneinfo.ZoneInfo("Europe/Paris") + return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S %Z") + except Exception: + return dt.strftime("%Y-%m-%d %H:%M:%S UTC") + +# ---------- Notifiers ---------- +def send_email(subject: str, body: str): + if not (SMTP_HOST and SMTP_USER and SMTP_PASS and MAIL_TO and MAIL_FROM): + logging.warning("Email non configuré (variables SMTP_* / MAIL_* manquantes).") + return + msg = MIMEText(body, _charset="utf-8") + msg["Subject"] = subject + msg["From"] = MAIL_FROM + msg["To"] = MAIL_TO + msg["Date"] = formatdate(localtime=True) + + try: + if SMTP_PORT == 465: + import ssl + context = ssl.create_default_context() + with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, context=context, timeout=20) as server: + server.login(SMTP_USER, SMTP_PASS) + server.sendmail(MAIL_FROM, MAIL_TO.split(","), msg.as_string()) + else: + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as server: + server.ehlo() + server.starttls() + server.login(SMTP_USER, SMTP_PASS) + server.sendmail(MAIL_FROM, MAIL_TO.split(","), msg.as_string()) + logging.info("Email envoyé.") + except Exception as e: + logging.error(f"Echec envoi email: {e}") + +def send_sms_via_webhook(text: str): + if not WEBHOOK_SMS_URL: + return + try: + import urllib.parse, urllib.request + url = WEBHOOK_SMS_URL + urllib.parse.quote(text) + with urllib.request.urlopen(url, timeout=10) as resp: + _ = resp.read() + logging.info("SMS (webhook) envoyé.") + except Exception as e: + logging.error(f"Echec envoi SMS webhook: {e}") + +def notify(subject: str, body: str): + send_email(subject, body) + # SMS via webhook (décommente si configuré) + send_sms_via_webhook(f"{subject} - {body}") + # Ou Twilio (si tu ajoutes la fonction et les variables d'env) + +# ---------- Watchdog ---------- +class SiteStatus: + def __init__(self, name: str, threshold_min: int): + self.name = name + self.threshold = timedelta(minutes=threshold_min) + self.last_seen: datetime | None = None + self.alert_sent = False + + def seen_now(self): + self.last_seen = now_utc() + + def check_and_alert(self): + now = now_utc() + if self.last_seen is None: + # Au démarrage, on attend de dépasser le seuil avant d’alerter + return None + delta = now - self.last_seen + if delta > self.threshold: + if not self.alert_sent: + self.alert_sent = True + return ("OUTAGE", + f"{self.name} : plus de données depuis {fmt_local(self.last_seen)} " + f"(écoulé: {int(delta.total_seconds()//60)} min).") + else: + if self.alert_sent: + self.alert_sent = False + return ("RECOVERY", + f"{self.name} : flux rétabli, dernier message à {fmt_local(self.last_seen)}.") + return None + +class MqttWatchdog: + def __init__(self, broker_host, broker_port, user, pwd, topics, threshold_min, check_every_s): + # API callbacks v2 (évite le DeprecationWarning) + self.client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) + self.client.username_pw_set(user, pwd) + self.client.on_connect = self._on_connect + self.client.on_message = self._on_message + self.client.on_disconnect = self._on_disconnect + + self.broker_host = broker_host + self.broker_port = broker_port + self.topics = topics # liste de tuples (topic, qos) + self.check_every_s = check_every_s + + # Statuts par site, déduits du préfixe: "Meudon/#" -> "Meudon" + self.sites: dict[str, SiteStatus] = {} + for t, _q in topics: + site = t.split("/", 1)[0] + self.sites[site] = SiteStatus(site, threshold_min) + + self._stop = threading.Event() + self._checker_thread = threading.Thread(target=self._checker_loop, daemon=True) + + # MQTT callbacks (API v2) + def _on_connect(self, client, userdata, flags, reason_code, properties=None): + if reason_code == 0: + logging.info("Connecté au broker MQTT.") + for t, q in self.topics: + client.subscribe(t, qos=q) + logging.info(f"Abonné à '{t}' (QoS {q})") + else: + logging.error(f"Echec connexion MQTT (reason_code={reason_code})") + + def _on_disconnect(self, client, userdata, reason_code, properties=None): + logging.warning(f"MQTT déconnecté (reason_code={reason_code}). Reconnexion auto gérée par loop_* si activée.") + + def _on_message(self, client, userdata, msg): + # Topic attendu: "Meudon/..." ou "Saclay/..." + site = msg.topic.split("/", 1)[0] + if site in self.sites: + self.sites[site].seen_now() + logging.debug(f"{site}: message reçu, mise à jour last_seen.") + + # Thread de vérification périodique + def _checker_loop(self): + while not self._stop.is_set(): + for site, status in self.sites.items(): + event = status.check_and_alert() + if event: + kind, text = event + if kind == "OUTAGE": + subject = f"[ALERTE] {site} inactif > seuil" + body = (f"Watchdog MQTT : {text}\n" + f"Seuil: {status.threshold} | Vérif {self.check_every_s}s\n" + f"Broker: {self.broker_host}:{self.broker_port}") + notify(subject, body) + logging.warning(text) + elif kind == "RECOVERY": + subject = f"[OK] {site} rétabli" + body = (f"Watchdog MQTT : {text}\n" + f"Seuil: {status.threshold} | Vérif {self.check_every_s}s\n" + f"Broker: {self.broker_host}:{self.broker_port}") + notify(subject, body) + logging.info(text) + self._stop.wait(self.check_every_s) + + def start(self): + self.client.connect(self.broker_host, self.broker_port, keepalive=60) + self.client.loop_start() # thread interne MQTT + reconnexions auto + self._checker_thread.start() + logging.info("Watchdog démarré.") + + def stop(self): + self._stop.set() + self._checker_thread.join(timeout=2) + self.client.loop_stop() + self.client.disconnect() + +# ---------- Main ---------- +def parse_args(): + p = argparse.ArgumentParser(description="Watchdog MQTT par site (inactivité > seuil)") + p.add_argument("--log", help="Chemin du fichier log (sinon stdout).") + p.add_argument("--broker-host", default=DEFAULT_BROKER_HOST) + p.add_argument("--broker-port", type=int, default=DEFAULT_BROKER_PORT) + p.add_argument("--mqtt-user", default=DEFAULT_MQTT_USER) + p.add_argument("--mqtt-pass", default=DEFAULT_MQTT_PASS) + p.add_argument("--threshold-min", type=int, default=15, help="Seuil d'inactivité en minutes") + p.add_argument("--check-every-s", type=int, default=60, help="Périodicité de vérification en secondes") + p.add_argument("--topics", default="Meudon/#,Saclay/#", help="liste de topics 'Site/#' séparés par des virgules") + return p.parse_args() + +if __name__ == "__main__": + args = parse_args() + setup_logger(args.log) + + topics = [] + for t in [x.strip() for x in args.topics.split(",") if x.strip()]: + topics.append((t, 1)) + + watchdog = MqttWatchdog( + broker_host=args.broker_host, + broker_port=args.broker_port, + user=args.mqtt_user, + pwd=args.mqtt_pass, + topics=topics, + threshold_min=args.threshold_min, + check_every_s=args.check_every_s, + ) + + try: + watchdog.start() + while True: + time.sleep(3600) + except KeyboardInterrupt: + pass + except Exception as e: + logging.error(f"Erreur fatale: {e}") + finally: + watchdog.stop() + logging.info("Watchdog arrêté.") diff --git a/requirements.txt b/requirements.txt index 6904ab7..b9424ed 100644 Binary files a/requirements.txt and b/requirements.txt differ