Remise en état des relevés temp
This commit is contained in:
@@ -5,12 +5,19 @@
|
||||
SITE = "Meudon"
|
||||
PROGRAM_NAME = f"Monitor_{SITE}"
|
||||
|
||||
# ========= Imports & .env =========
|
||||
import os, re, time, ssl, smtplib, logging
|
||||
import datetime as dt
|
||||
from email.message import EmailMessage
|
||||
# ========= Imports & .env =========
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import smtplib
|
||||
import ssl
|
||||
import time
|
||||
from datetime import datetime
|
||||
from email.message import EmailMessage
|
||||
|
||||
from dotenv import load_dotenv, find_dotenv
|
||||
|
||||
load_dotenv(find_dotenv(usecwd=True), override=False)
|
||||
from utils_sms import normaliser_sms
|
||||
|
||||
|
||||
188
app/Mqtt_meudon.py
Normal file
188
app/Mqtt_meudon.py
Normal file
@@ -0,0 +1,188 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Mqtt_meudon.py
|
||||
Récupère les mesures MQTT du site Meudon et les insère dans la table Sondes.Meudon.
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
from paho.mqtt.client import CallbackAPIVersion
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# =========================
|
||||
# Chargement du .env
|
||||
# =========================
|
||||
load_dotenv()
|
||||
|
||||
# --- MySQL (commun) ---
|
||||
DB_HOST = os.getenv("DB_HOST")
|
||||
DB_USER = os.getenv("DB_USER")
|
||||
DB_PASS = os.getenv("DB_PASS")
|
||||
DB_NAME = os.getenv("DB_NAME")
|
||||
|
||||
# --- MQTT Meudon ---
|
||||
MQTT_HOST = os.getenv("MQTT_HOST_MEUDON", os.getenv("MQTT_HOST"))
|
||||
MQTT_USER = os.getenv("MQTT_USER_MEUDON", os.getenv("MQTT_USER"))
|
||||
MQTT_PASS = os.getenv("MQTT_PASS_MEUDON", os.getenv("MQTT_PASS"))
|
||||
MQTT_PORT = int(os.getenv("MQTT_PORT_MEUDON", os.getenv("MQTT_PORT", 1883)))
|
||||
|
||||
GYRO_TOPIC_MEUDON = os.getenv("GYRO_MQTT_TOPIC_MEUDON", "Meudon/gyrophare")
|
||||
|
||||
# Nom de la table de destination
|
||||
TABLE_NAME = "Meudon"
|
||||
|
||||
# =========================
|
||||
# Logging
|
||||
# =========================
|
||||
def setup_logging():
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(levelname)s - %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
|
||||
# Console
|
||||
console = logging.StreamHandler()
|
||||
console.setFormatter(formatter)
|
||||
logger.addHandler(console)
|
||||
|
||||
# Logs fichier (même logique que Saclay)
|
||||
log_dir = os.getenv("LOG_DIR", "./Logs")
|
||||
try:
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
file_handler = RotatingFileHandler(
|
||||
os.path.join(log_dir, "Mqtt_meudon.log"),
|
||||
maxBytes=1_000_000,
|
||||
backupCount=5,
|
||||
encoding="utf-8",
|
||||
)
|
||||
file_handler.setFormatter(formatter)
|
||||
logger.addHandler(file_handler)
|
||||
except Exception as e:
|
||||
logging.warning("Impossible de créer le fichier de log : %s", e)
|
||||
|
||||
|
||||
# =========================
|
||||
# Accès MySQL
|
||||
# =========================
|
||||
def insert_temperature(sonde: str, temperature: float) -> None:
|
||||
"""
|
||||
Insère une mesure dans la table Sondes.Meudon.
|
||||
La colonne Date utilise CURRENT_TIMESTAMP par défaut dans MySQL.
|
||||
"""
|
||||
try:
|
||||
conn = mysql.connector.connect(
|
||||
host=DB_HOST,
|
||||
user=DB_USER,
|
||||
password=DB_PASS,
|
||||
database=DB_NAME,
|
||||
)
|
||||
|
||||
cursor = conn.cursor()
|
||||
sql = f"INSERT INTO {TABLE_NAME} (Sonde, Temperature) VALUES (%s, %s)"
|
||||
cursor.execute(sql, (sonde, temperature))
|
||||
conn.commit()
|
||||
|
||||
logging.info("Insertion OK (Meudon) -> %s = %.2f", sonde, temperature)
|
||||
|
||||
except Error as e:
|
||||
logging.exception("Erreur MySQL (Meudon) pour la sonde %s : %s", sonde, e)
|
||||
|
||||
finally:
|
||||
try:
|
||||
if cursor:
|
||||
cursor.close()
|
||||
if conn and conn.is_connected():
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# =========================
|
||||
# Callbacks MQTT (API v2)
|
||||
# =========================
|
||||
def on_connect(client, userdata, flags, reason_code, properties=None):
|
||||
if reason_code == 0:
|
||||
logging.info("Connecté au broker MQTT Meudon (%s)", MQTT_HOST)
|
||||
client.subscribe("Meudon/mod02/#")
|
||||
logging.info("Abonné au topic : Meudon/#")
|
||||
else:
|
||||
logging.error("Échec de connexion MQTT (Meudon), code retour = %s", reason_code)
|
||||
|
||||
|
||||
def on_message(client, userdata, msg: mqtt.MQTTMessage):
|
||||
topic = msg.topic
|
||||
payload_raw = msg.payload.decode("utf-8", errors="ignore").strip()
|
||||
|
||||
logging.debug("Msg reçu (Meudon) : topic=%s payload=%s", topic, payload_raw)
|
||||
|
||||
# On ignore le gyrophare
|
||||
if topic == GYRO_TOPIC_MEUDON:
|
||||
logging.debug("Topic gyrophare Meudon ignoré : %s", topic)
|
||||
return
|
||||
|
||||
# Nom de la sonde = dernier segment du topic
|
||||
sonde = topic.split("/")[-1] if "/" in topic else topic
|
||||
|
||||
# Conversion du payload en float
|
||||
try:
|
||||
value = float(payload_raw.replace(",", "."))
|
||||
except ValueError:
|
||||
logging.warning(
|
||||
"Payload non numérique (Meudon), mesure ignorée (topic=%s, payload=%s)",
|
||||
topic,
|
||||
payload_raw,
|
||||
)
|
||||
return
|
||||
|
||||
insert_temperature(sonde, value)
|
||||
|
||||
|
||||
# =========================
|
||||
# Programme principal
|
||||
# =========================
|
||||
def main():
|
||||
setup_logging()
|
||||
logging.info("Démarrage du script Mqtt_meudon")
|
||||
|
||||
# Vérif minimale des variables d'env
|
||||
for var in ["DB_HOST", "DB_USER", "DB_PASS", "DB_NAME"]:
|
||||
if os.getenv(var) in (None, ""):
|
||||
logging.error("Variable d'environnement %s manquante !", var)
|
||||
|
||||
client = mqtt.Client(
|
||||
client_id="Mqtt_meudon_client",
|
||||
callback_api_version=CallbackAPIVersion.VERSION2
|
||||
)
|
||||
client.username_pw_set(MQTT_USER, MQTT_PASS)
|
||||
|
||||
client.on_connect = on_connect
|
||||
client.on_message = on_message
|
||||
|
||||
try:
|
||||
client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
|
||||
except Exception as e:
|
||||
logging.exception("Impossible de se connecter au broker MQTT Meudon : %s", e)
|
||||
return
|
||||
|
||||
logging.info("Boucle MQTT Meudon en cours (Ctrl+C pour arrêter)...")
|
||||
try:
|
||||
client.loop_forever()
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Arrêt demandé par l'utilisateur (Meudon).")
|
||||
finally:
|
||||
client.disconnect()
|
||||
logging.info("Client MQTT Meudon déconnecté.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
166
app/Mqtt_saclay.py
Normal file
166
app/Mqtt_saclay.py
Normal file
@@ -0,0 +1,166 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Mqtt_saclay.py
|
||||
Récupère les mesures MQTT du site Saclay et les insère dans la table Sondes.Saclay.
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
from paho.mqtt.client import CallbackAPIVersion
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# =========================
|
||||
# Chargement du .env
|
||||
# =========================
|
||||
load_dotenv()
|
||||
|
||||
DB_HOST = os.getenv("DB_HOST")
|
||||
DB_USER = os.getenv("DB_USER")
|
||||
DB_PASS = os.getenv("DB_PASS")
|
||||
DB_NAME = os.getenv("DB_NAME")
|
||||
|
||||
MQTT_HOST = os.getenv("MQTT_HOST")
|
||||
MQTT_USER = os.getenv("MQTT_USER")
|
||||
MQTT_PASS = os.getenv("MQTT_PASS")
|
||||
MQTT_PORT = int(os.getenv("MQTT_PORT", 1883))
|
||||
|
||||
GYRO_TOPIC_SACLAY = os.getenv("GYRO_MQTT_TOPIC_SACLAY", "Saclay/gyrophare")
|
||||
TABLE_NAME = "Saclay"
|
||||
|
||||
# =========================
|
||||
# Logging
|
||||
# =========================
|
||||
def setup_logging():
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
|
||||
# Console
|
||||
console = logging.StreamHandler()
|
||||
console.setFormatter(formatter)
|
||||
logger.addHandler(console)
|
||||
|
||||
# Logs fichier
|
||||
log_dir = os.getenv("LOG_DIR", "./Logs")
|
||||
try:
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
file_handler = RotatingFileHandler(
|
||||
os.path.join(log_dir, "Mqtt_saclay.log"),
|
||||
maxBytes=1_000_000,
|
||||
backupCount=5,
|
||||
encoding="utf-8",
|
||||
)
|
||||
file_handler.setFormatter(formatter)
|
||||
logger.addHandler(file_handler)
|
||||
except Exception as e:
|
||||
logging.warning("Impossible de créer le fichier de log : %s", e)
|
||||
|
||||
|
||||
# =========================
|
||||
# Accès MySQL
|
||||
# =========================
|
||||
def insert_temperature(sonde: str, temperature: float) -> None:
|
||||
try:
|
||||
conn = mysql.connector.connect(
|
||||
host=DB_HOST,
|
||||
user=DB_USER,
|
||||
password=DB_PASS,
|
||||
database=DB_NAME,
|
||||
)
|
||||
|
||||
cursor = conn.cursor()
|
||||
sql = f"INSERT INTO {TABLE_NAME} (Sonde, Temperature) VALUES (%s, %s)"
|
||||
cursor.execute(sql, (sonde, temperature))
|
||||
conn.commit()
|
||||
|
||||
logging.info("Insertion OK -> %s = %.2f", sonde, temperature)
|
||||
|
||||
except Error as e:
|
||||
logging.exception("Erreur MySQL pour la sonde %s : %s", sonde, e)
|
||||
|
||||
finally:
|
||||
try:
|
||||
if cursor:
|
||||
cursor.close()
|
||||
if conn and conn.is_connected():
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# =========================
|
||||
# Callbacks MQTT (API v2)
|
||||
# =========================
|
||||
def on_connect(client, userdata, flags, reason_code, properties=None):
|
||||
if reason_code == 0:
|
||||
logging.info("Connecté au broker MQTT (%s)", MQTT_HOST)
|
||||
client.subscribe("Saclay/#")
|
||||
logging.info("Abonné au topic : Saclay/#")
|
||||
else:
|
||||
logging.error("Échec de connexion MQTT, code retour = %s", reason_code)
|
||||
|
||||
|
||||
def on_message(client, userdata, msg: mqtt.MQTTMessage):
|
||||
topic = msg.topic
|
||||
payload_raw = msg.payload.decode("utf-8", errors="ignore").strip()
|
||||
|
||||
logging.debug("Msg reçu : topic=%s payload=%s", topic, payload_raw)
|
||||
|
||||
if topic == GYRO_TOPIC_SACLAY:
|
||||
return # on ignore le gyrophare
|
||||
|
||||
sonde = topic.split("/")[-1] if "/" in topic else topic
|
||||
|
||||
try:
|
||||
value = float(payload_raw.replace(",", "."))
|
||||
except ValueError:
|
||||
logging.warning("Payload non numérique (topic=%s payload=%s)", topic, payload_raw)
|
||||
return
|
||||
|
||||
insert_temperature(sonde, value)
|
||||
|
||||
|
||||
# =========================
|
||||
# Programme principal
|
||||
# =========================
|
||||
def main():
|
||||
setup_logging()
|
||||
logging.info("Démarrage du script Mqtt_saclay")
|
||||
|
||||
client = mqtt.Client(
|
||||
client_id="Mqtt_saclay_client",
|
||||
callback_api_version=CallbackAPIVersion.VERSION2
|
||||
)
|
||||
client.username_pw_set(MQTT_USER, MQTT_PASS)
|
||||
|
||||
client.on_connect = on_connect
|
||||
client.on_message = on_message
|
||||
|
||||
try:
|
||||
client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
|
||||
except Exception as e:
|
||||
logging.exception("Impossible de se connecter au broker MQTT : %s", e)
|
||||
return
|
||||
|
||||
logging.info("Boucle MQTT en cours (Ctrl+C pour arrêter)...")
|
||||
try:
|
||||
client.loop_forever()
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Arrêt demandé par l'utilisateur.")
|
||||
finally:
|
||||
client.disconnect()
|
||||
logging.info("Déconnexion MQTT.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,19 +0,0 @@
|
||||
import os, logging
|
||||
|
||||
def setup_logger(filename: str):
|
||||
# si l'argument est un chemin absolu, on le respecte
|
||||
if os.path.isabs(filename):
|
||||
log_path = filename
|
||||
else:
|
||||
# ancien comportement (ex: /var/log/Cuisine_Saclay/<filename>)
|
||||
base = "/var/log/Cuisine_Saclay"
|
||||
os.makedirs(base, exist_ok=True)
|
||||
log_path = os.path.join(base, filename)
|
||||
|
||||
os.makedirs(os.path.dirname(log_path), exist_ok=True)
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
filename=log_path,
|
||||
format="%(asctime)s %(levelname)s %(message)s"
|
||||
)
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
import argparse
|
||||
import paho.mqtt.client as mqtt_client
|
||||
from dotenv import load_dotenv
|
||||
import logging
|
||||
from logger_config import setup_logger
|
||||
from utils_db import connect_to_mysql
|
||||
from functools import partial
|
||||
|
||||
def on_message(table_sql, _client, _userdata, msg):
|
||||
try:
|
||||
logging.info(f"Message reçu sur {msg.topic}: {msg.payload.decode()}")
|
||||
cursor = mydb.cursor()
|
||||
sonde_name = '/'.join(msg.topic.split('/')[1:])
|
||||
sql = f"INSERT INTO {table_sql} (Sonde, Temperature) VALUES (%s, %s)"
|
||||
val = (sonde_name, msg.payload.decode())
|
||||
cursor.execute(sql, val)
|
||||
mydb.commit()
|
||||
logging.info(f"Insertion réussie : {val}")
|
||||
except Exception as e:
|
||||
logging.error(f"Erreur lors de l'insertion du message : {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--log", required=True, help="Nom du fichier de log")
|
||||
parser.add_argument("--table", required=True, help="Nom complet de la table SQL")
|
||||
parser.add_argument("--topic", required=True, help="Topic MQTT à écouter")
|
||||
args = parser.parse_args()
|
||||
|
||||
# 📋 Initialiser le logger
|
||||
setup_logger(args.log)
|
||||
|
||||
# 🔑 Charger les variables d'environnement
|
||||
load_dotenv()
|
||||
|
||||
# 🔌 Connexion MySQL
|
||||
mydb = connect_to_mysql()
|
||||
|
||||
# 📡 Connexion MQTT
|
||||
try:
|
||||
client = mqtt_client.Client()
|
||||
client.username_pw_set("Bwps", "scJ5ACj2keRfI^")
|
||||
client.on_message = partial(on_message, args.table)
|
||||
client.connect("54.36.188.119", 1883, 60)
|
||||
client.subscribe(args.topic)
|
||||
logging.info(f"Connexion MQTT réussie et abonnement au topic '{args.topic}'.")
|
||||
client.loop_forever()
|
||||
except Exception as err:
|
||||
logging.error(f"Erreur MQTT : {err}")
|
||||
253
app/mqtt_watchdog.py
Normal file
253
app/mqtt_watchdog.py
Normal file
@@ -0,0 +1,253 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import smtplib
|
||||
from email.mime.text import MIMEText
|
||||
from email.utils import formatdate
|
||||
import threading
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
from dotenv import load_dotenv; load_dotenv()
|
||||
|
||||
# ---------- Configuration par défaut ----------
|
||||
DEFAULT_BROKER_HOST = os.getenv("MQTT_HOST", "54.36.188.119")
|
||||
DEFAULT_BROKER_PORT = int(os.getenv("MQTT_PORT", "1883"))
|
||||
DEFAULT_MQTT_USER = os.getenv("MQTT_USER", "Bwps")
|
||||
DEFAULT_MQTT_PASS = os.getenv("MQTT_PASS", "scJ5ACj2keRfI^")
|
||||
|
||||
# Email (OVH SMTP par ex.)
|
||||
SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net")
|
||||
SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # 465=SSL, 587=STARTTLS
|
||||
SMTP_USER = os.getenv("SMTP_USER", "")
|
||||
SMTP_PASS = os.getenv("SMTP_PASS", "")
|
||||
MAIL_FROM = os.getenv("MAIL_FROM", SMTP_USER or "alerte@exemple.fr")
|
||||
MAIL_TO = os.getenv("MAIL_TO", "") # "toi@domaine.fr,ops@domaine.fr"
|
||||
|
||||
# Webhook SMS optionnel (ex: Free Mobile / OVH / autre)
|
||||
WEBHOOK_SMS_URL = os.getenv("WEBHOOK_SMS_URL", "") # ex: https://smsapi.free-mobile.fr/sendmsg?user=XXX&pass=YYY&msg=
|
||||
|
||||
# ---------- Helpers ----------
|
||||
def setup_logger(logfile: str | None):
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(message)s",
|
||||
handlers=[logging.StreamHandler(sys.stdout)] if not logfile else [
|
||||
logging.FileHandler(logfile),
|
||||
logging.StreamHandler(sys.stdout)
|
||||
],
|
||||
)
|
||||
|
||||
def now_utc():
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
def fmt_local(dt: datetime):
|
||||
# Affichage lisible en Europe/Paris
|
||||
try:
|
||||
import zoneinfo
|
||||
tz = zoneinfo.ZoneInfo("Europe/Paris")
|
||||
return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S %Z")
|
||||
except Exception:
|
||||
return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||
|
||||
# ---------- Notifiers ----------
|
||||
def send_email(subject: str, body: str):
|
||||
if not (SMTP_HOST and SMTP_USER and SMTP_PASS and MAIL_TO and MAIL_FROM):
|
||||
logging.warning("Email non configuré (variables SMTP_* / MAIL_* manquantes).")
|
||||
return
|
||||
msg = MIMEText(body, _charset="utf-8")
|
||||
msg["Subject"] = subject
|
||||
msg["From"] = MAIL_FROM
|
||||
msg["To"] = MAIL_TO
|
||||
msg["Date"] = formatdate(localtime=True)
|
||||
|
||||
try:
|
||||
if SMTP_PORT == 465:
|
||||
import ssl
|
||||
context = ssl.create_default_context()
|
||||
with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, context=context, timeout=20) as server:
|
||||
server.login(SMTP_USER, SMTP_PASS)
|
||||
server.sendmail(MAIL_FROM, MAIL_TO.split(","), msg.as_string())
|
||||
else:
|
||||
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as server:
|
||||
server.ehlo()
|
||||
server.starttls()
|
||||
server.login(SMTP_USER, SMTP_PASS)
|
||||
server.sendmail(MAIL_FROM, MAIL_TO.split(","), msg.as_string())
|
||||
logging.info("Email envoyé.")
|
||||
except Exception as e:
|
||||
logging.error(f"Echec envoi email: {e}")
|
||||
|
||||
def send_sms_via_webhook(text: str):
|
||||
if not WEBHOOK_SMS_URL:
|
||||
return
|
||||
try:
|
||||
import urllib.parse, urllib.request
|
||||
url = WEBHOOK_SMS_URL + urllib.parse.quote(text)
|
||||
with urllib.request.urlopen(url, timeout=10) as resp:
|
||||
_ = resp.read()
|
||||
logging.info("SMS (webhook) envoyé.")
|
||||
except Exception as e:
|
||||
logging.error(f"Echec envoi SMS webhook: {e}")
|
||||
|
||||
def notify(subject: str, body: str):
|
||||
send_email(subject, body)
|
||||
# SMS via webhook (décommente si configuré)
|
||||
send_sms_via_webhook(f"{subject} - {body}")
|
||||
# Ou Twilio (si tu ajoutes la fonction et les variables d'env)
|
||||
|
||||
# ---------- Watchdog ----------
|
||||
class SiteStatus:
|
||||
def __init__(self, name: str, threshold_min: int):
|
||||
self.name = name
|
||||
self.threshold = timedelta(minutes=threshold_min)
|
||||
self.last_seen: datetime | None = None
|
||||
self.alert_sent = False
|
||||
|
||||
def seen_now(self):
|
||||
self.last_seen = now_utc()
|
||||
|
||||
def check_and_alert(self):
|
||||
now = now_utc()
|
||||
if self.last_seen is None:
|
||||
# Au démarrage, on attend de dépasser le seuil avant d’alerter
|
||||
return None
|
||||
delta = now - self.last_seen
|
||||
if delta > self.threshold:
|
||||
if not self.alert_sent:
|
||||
self.alert_sent = True
|
||||
return ("OUTAGE",
|
||||
f"{self.name} : plus de données depuis {fmt_local(self.last_seen)} "
|
||||
f"(écoulé: {int(delta.total_seconds()//60)} min).")
|
||||
else:
|
||||
if self.alert_sent:
|
||||
self.alert_sent = False
|
||||
return ("RECOVERY",
|
||||
f"{self.name} : flux rétabli, dernier message à {fmt_local(self.last_seen)}.")
|
||||
return None
|
||||
|
||||
class MqttWatchdog:
|
||||
def __init__(self, broker_host, broker_port, user, pwd, topics, threshold_min, check_every_s):
|
||||
# API callbacks v2 (évite le DeprecationWarning)
|
||||
self.client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
|
||||
self.client.username_pw_set(user, pwd)
|
||||
self.client.on_connect = self._on_connect
|
||||
self.client.on_message = self._on_message
|
||||
self.client.on_disconnect = self._on_disconnect
|
||||
|
||||
self.broker_host = broker_host
|
||||
self.broker_port = broker_port
|
||||
self.topics = topics # liste de tuples (topic, qos)
|
||||
self.check_every_s = check_every_s
|
||||
|
||||
# Statuts par site, déduits du préfixe: "Meudon/#" -> "Meudon"
|
||||
self.sites: dict[str, SiteStatus] = {}
|
||||
for t, _q in topics:
|
||||
site = t.split("/", 1)[0]
|
||||
self.sites[site] = SiteStatus(site, threshold_min)
|
||||
|
||||
self._stop = threading.Event()
|
||||
self._checker_thread = threading.Thread(target=self._checker_loop, daemon=True)
|
||||
|
||||
# MQTT callbacks (API v2)
|
||||
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
|
||||
if reason_code == 0:
|
||||
logging.info("Connecté au broker MQTT.")
|
||||
for t, q in self.topics:
|
||||
client.subscribe(t, qos=q)
|
||||
logging.info(f"Abonné à '{t}' (QoS {q})")
|
||||
else:
|
||||
logging.error(f"Echec connexion MQTT (reason_code={reason_code})")
|
||||
|
||||
def _on_disconnect(self, client, userdata, reason_code, properties=None):
|
||||
logging.warning(f"MQTT déconnecté (reason_code={reason_code}). Reconnexion auto gérée par loop_* si activée.")
|
||||
|
||||
def _on_message(self, client, userdata, msg):
|
||||
# Topic attendu: "Meudon/..." ou "Saclay/..."
|
||||
site = msg.topic.split("/", 1)[0]
|
||||
if site in self.sites:
|
||||
self.sites[site].seen_now()
|
||||
logging.debug(f"{site}: message reçu, mise à jour last_seen.")
|
||||
|
||||
# Thread de vérification périodique
|
||||
def _checker_loop(self):
|
||||
while not self._stop.is_set():
|
||||
for site, status in self.sites.items():
|
||||
event = status.check_and_alert()
|
||||
if event:
|
||||
kind, text = event
|
||||
if kind == "OUTAGE":
|
||||
subject = f"[ALERTE] {site} inactif > seuil"
|
||||
body = (f"Watchdog MQTT : {text}\n"
|
||||
f"Seuil: {status.threshold} | Vérif {self.check_every_s}s\n"
|
||||
f"Broker: {self.broker_host}:{self.broker_port}")
|
||||
notify(subject, body)
|
||||
logging.warning(text)
|
||||
elif kind == "RECOVERY":
|
||||
subject = f"[OK] {site} rétabli"
|
||||
body = (f"Watchdog MQTT : {text}\n"
|
||||
f"Seuil: {status.threshold} | Vérif {self.check_every_s}s\n"
|
||||
f"Broker: {self.broker_host}:{self.broker_port}")
|
||||
notify(subject, body)
|
||||
logging.info(text)
|
||||
self._stop.wait(self.check_every_s)
|
||||
|
||||
def start(self):
|
||||
self.client.connect(self.broker_host, self.broker_port, keepalive=60)
|
||||
self.client.loop_start() # thread interne MQTT + reconnexions auto
|
||||
self._checker_thread.start()
|
||||
logging.info("Watchdog démarré.")
|
||||
|
||||
def stop(self):
|
||||
self._stop.set()
|
||||
self._checker_thread.join(timeout=2)
|
||||
self.client.loop_stop()
|
||||
self.client.disconnect()
|
||||
|
||||
# ---------- Main ----------
|
||||
def parse_args():
|
||||
p = argparse.ArgumentParser(description="Watchdog MQTT par site (inactivité > seuil)")
|
||||
p.add_argument("--log", help="Chemin du fichier log (sinon stdout).")
|
||||
p.add_argument("--broker-host", default=DEFAULT_BROKER_HOST)
|
||||
p.add_argument("--broker-port", type=int, default=DEFAULT_BROKER_PORT)
|
||||
p.add_argument("--mqtt-user", default=DEFAULT_MQTT_USER)
|
||||
p.add_argument("--mqtt-pass", default=DEFAULT_MQTT_PASS)
|
||||
p.add_argument("--threshold-min", type=int, default=15, help="Seuil d'inactivité en minutes")
|
||||
p.add_argument("--check-every-s", type=int, default=60, help="Périodicité de vérification en secondes")
|
||||
p.add_argument("--topics", default="Meudon/#,Saclay/#", help="liste de topics 'Site/#' séparés par des virgules")
|
||||
return p.parse_args()
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = parse_args()
|
||||
setup_logger(args.log)
|
||||
|
||||
topics = []
|
||||
for t in [x.strip() for x in args.topics.split(",") if x.strip()]:
|
||||
topics.append((t, 1))
|
||||
|
||||
watchdog = MqttWatchdog(
|
||||
broker_host=args.broker_host,
|
||||
broker_port=args.broker_port,
|
||||
user=args.mqtt_user,
|
||||
pwd=args.mqtt_pass,
|
||||
topics=topics,
|
||||
threshold_min=args.threshold_min,
|
||||
check_every_s=args.check_every_s,
|
||||
)
|
||||
|
||||
try:
|
||||
watchdog.start()
|
||||
while True:
|
||||
time.sleep(3600)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception as e:
|
||||
logging.error(f"Erreur fatale: {e}")
|
||||
finally:
|
||||
watchdog.stop()
|
||||
logging.info("Watchdog arrêté.")
|
||||
Reference in New Issue
Block a user