Files
Gestion_sondes/app/mqtt_watchdog.py

254 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
import smtplib
from email.mime.text import MIMEText
from email.utils import formatdate
import threading
import paho.mqtt.client as mqtt
from dotenv import load_dotenv; load_dotenv()
# ---------- Configuration par défaut ----------
DEFAULT_BROKER_HOST = os.getenv("MQTT_HOST")
DEFAULT_BROKER_PORT = int(os.getenv("MQTT_PORT", "1883"))
DEFAULT_MQTT_USER = os.getenv("MQTT_USER")
DEFAULT_MQTT_PASS = os.getenv("MQTT_PASS")
# Email (OVH SMTP par ex.)
SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net")
SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # 465=SSL, 587=STARTTLS
SMTP_USER = os.getenv("SMTP_USER", "")
SMTP_PASS = os.getenv("SMTP_PASS", "")
MAIL_FROM = os.getenv("MAIL_FROM", SMTP_USER or "alerte@exemple.fr")
MAIL_TO = os.getenv("MAIL_TO", "") # "toi@domaine.fr,ops@domaine.fr"
# Webhook SMS optionnel (ex: Free Mobile / OVH / autre)
WEBHOOK_SMS_URL = os.getenv("WEBHOOK_SMS_URL", "") # ex: https://smsapi.free-mobile.fr/sendmsg?user=XXX&pass=YYY&msg=
# ---------- Helpers ----------
def setup_logger(logfile: str | None):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[logging.StreamHandler(sys.stdout)] if not logfile else [
logging.FileHandler(logfile),
logging.StreamHandler(sys.stdout)
],
)
def now_utc():
return datetime.now(timezone.utc)
def fmt_local(dt: datetime):
# Affichage lisible en Europe/Paris
try:
import zoneinfo
tz = zoneinfo.ZoneInfo("Europe/Paris")
return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S %Z")
except Exception:
return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
# ---------- Notifiers ----------
def send_email(subject: str, body: str):
if not (SMTP_HOST and SMTP_USER and SMTP_PASS and MAIL_TO and MAIL_FROM):
logging.warning("Email non configuré (variables SMTP_* / MAIL_* manquantes).")
return
msg = MIMEText(body, _charset="utf-8")
msg["Subject"] = subject
msg["From"] = MAIL_FROM
msg["To"] = MAIL_TO
msg["Date"] = formatdate(localtime=True)
try:
if SMTP_PORT == 465:
import ssl
context = ssl.create_default_context()
with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, context=context, timeout=20) as server:
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(MAIL_FROM, MAIL_TO.split(","), msg.as_string())
else:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as server:
server.ehlo()
server.starttls()
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(MAIL_FROM, MAIL_TO.split(","), msg.as_string())
logging.info("Email envoyé.")
except Exception as e:
logging.error(f"Echec envoi email: {e}")
def send_sms_via_webhook(text: str):
if not WEBHOOK_SMS_URL:
return
try:
import urllib.parse, urllib.request
url = WEBHOOK_SMS_URL + urllib.parse.quote(text)
with urllib.request.urlopen(url, timeout=10) as resp:
_ = resp.read()
logging.info("SMS (webhook) envoyé.")
except Exception as e:
logging.error(f"Echec envoi SMS webhook: {e}")
def notify(subject: str, body: str):
send_email(subject, body)
# SMS via webhook (décommente si configuré)
send_sms_via_webhook(f"{subject} - {body}")
# Ou Twilio (si tu ajoutes la fonction et les variables d'env)
# ---------- Watchdog ----------
class SiteStatus:
def __init__(self, name: str, threshold_min: int):
self.name = name
self.threshold = timedelta(minutes=threshold_min)
self.last_seen: datetime | None = None
self.alert_sent = False
def seen_now(self):
self.last_seen = now_utc()
def check_and_alert(self):
now = now_utc()
if self.last_seen is None:
# Au démarrage, on attend de dépasser le seuil avant dalerter
return None
delta = now - self.last_seen
if delta > self.threshold:
if not self.alert_sent:
self.alert_sent = True
return ("OUTAGE",
f"{self.name} : plus de données depuis {fmt_local(self.last_seen)} "
f"(écoulé: {int(delta.total_seconds()//60)} min).")
else:
if self.alert_sent:
self.alert_sent = False
return ("RECOVERY",
f"{self.name} : flux rétabli, dernier message à {fmt_local(self.last_seen)}.")
return None
class MqttWatchdog:
def __init__(self, broker_host, broker_port, user, pwd, topics, threshold_min, check_every_s):
# API callbacks v2 (évite le DeprecationWarning)
self.client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
self.client.username_pw_set(user, pwd)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
self.broker_host = broker_host
self.broker_port = broker_port
self.topics = topics # liste de tuples (topic, qos)
self.check_every_s = check_every_s
# Statuts par site, déduits du préfixe: "Meudon/#" -> "Meudon"
self.sites: dict[str, SiteStatus] = {}
for t, _q in topics:
site = t.split("/", 1)[0]
self.sites[site] = SiteStatus(site, threshold_min)
self._stop = threading.Event()
self._checker_thread = threading.Thread(target=self._checker_loop, daemon=True)
# MQTT callbacks (API v2)
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
logging.info("Connecté au broker MQTT.")
for t, q in self.topics:
client.subscribe(t, qos=q)
logging.info(f"Abonné à '{t}' (QoS {q})")
else:
logging.error(f"Echec connexion MQTT (reason_code={reason_code})")
def _on_disconnect(self, client, userdata, reason_code, properties=None):
logging.warning(f"MQTT déconnecté (reason_code={reason_code}). Reconnexion auto gérée par loop_* si activée.")
def _on_message(self, client, userdata, msg):
# Topic attendu: "Meudon/..." ou "Saclay/..."
site = msg.topic.split("/", 1)[0]
if site in self.sites:
self.sites[site].seen_now()
logging.debug(f"{site}: message reçu, mise à jour last_seen.")
# Thread de vérification périodique
def _checker_loop(self):
while not self._stop.is_set():
for site, status in self.sites.items():
event = status.check_and_alert()
if event:
kind, text = event
if kind == "OUTAGE":
subject = f"[ALERTE] {site} inactif > seuil"
body = (f"Watchdog MQTT : {text}\n"
f"Seuil: {status.threshold} | Vérif {self.check_every_s}s\n"
f"Broker: {self.broker_host}:{self.broker_port}")
notify(subject, body)
logging.warning(text)
elif kind == "RECOVERY":
subject = f"[OK] {site} rétabli"
body = (f"Watchdog MQTT : {text}\n"
f"Seuil: {status.threshold} | Vérif {self.check_every_s}s\n"
f"Broker: {self.broker_host}:{self.broker_port}")
notify(subject, body)
logging.info(text)
self._stop.wait(self.check_every_s)
def start(self):
self.client.connect(self.broker_host, self.broker_port, keepalive=60)
self.client.loop_start() # thread interne MQTT + reconnexions auto
self._checker_thread.start()
logging.info("Watchdog démarré.")
def stop(self):
self._stop.set()
self._checker_thread.join(timeout=2)
self.client.loop_stop()
self.client.disconnect()
# ---------- Main ----------
def parse_args():
p = argparse.ArgumentParser(description="Watchdog MQTT par site (inactivité > seuil)")
p.add_argument("--log", help="Chemin du fichier log (sinon stdout).")
p.add_argument("--broker-host", default=DEFAULT_BROKER_HOST)
p.add_argument("--broker-port", type=int, default=DEFAULT_BROKER_PORT)
p.add_argument("--mqtt-user", default=DEFAULT_MQTT_USER)
p.add_argument("--mqtt-pass", default=DEFAULT_MQTT_PASS)
p.add_argument("--threshold-min", type=int, default=15, help="Seuil d'inactivité en minutes")
p.add_argument("--check-every-s", type=int, default=60, help="Périodicité de vérification en secondes")
p.add_argument("--topics", default="Meudon/#,Saclay/#", help="liste de topics 'Site/#' séparés par des virgules")
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
setup_logger(args.log)
topics = []
for t in [x.strip() for x in args.topics.split(",") if x.strip()]:
topics.append((t, 1))
watchdog = MqttWatchdog(
broker_host=args.broker_host,
broker_port=args.broker_port,
user=args.mqtt_user,
pwd=args.mqtt_pass,
topics=topics,
threshold_min=args.threshold_min,
check_every_s=args.check_every_s,
)
try:
watchdog.start()
while True:
time.sleep(3600)
except KeyboardInterrupt:
pass
except Exception as e:
logging.error(f"Erreur fatale: {e}")
finally:
watchdog.stop()
logging.info("Watchdog arrêté.")