#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import logging import os import sys import time from datetime import datetime, timedelta, timezone import smtplib from email.mime.text import MIMEText from email.utils import formatdate import threading import paho.mqtt.client as mqtt from dotenv import load_dotenv; load_dotenv() # ---------- Configuration par défaut ---------- DEFAULT_BROKER_HOST = os.getenv("MQTT_HOST") DEFAULT_BROKER_PORT = int(os.getenv("MQTT_PORT", "1883")) DEFAULT_MQTT_USER = os.getenv("MQTT_USER") DEFAULT_MQTT_PASS = os.getenv("MQTT_PASS") # Email (OVH SMTP par ex.) SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net") SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # 465=SSL, 587=STARTTLS SMTP_USER = os.getenv("SMTP_USER", "") SMTP_PASS = os.getenv("SMTP_PASS", "") MAIL_FROM = os.getenv("MAIL_FROM", SMTP_USER or "alerte@exemple.fr") MAIL_TO = os.getenv("MAIL_TO", "") # "toi@domaine.fr,ops@domaine.fr" # Webhook SMS optionnel (ex: Free Mobile / OVH / autre) WEBHOOK_SMS_URL = os.getenv("WEBHOOK_SMS_URL", "") # ex: https://smsapi.free-mobile.fr/sendmsg?user=XXX&pass=YYY&msg= # ---------- Helpers ---------- def setup_logger(logfile: str | None): logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", handlers=[logging.StreamHandler(sys.stdout)] if not logfile else [ logging.FileHandler(logfile), logging.StreamHandler(sys.stdout) ], ) def now_utc(): return datetime.now(timezone.utc) def fmt_local(dt: datetime): # Affichage lisible en Europe/Paris try: import zoneinfo tz = zoneinfo.ZoneInfo("Europe/Paris") return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S %Z") except Exception: return dt.strftime("%Y-%m-%d %H:%M:%S UTC") # ---------- Notifiers ---------- def send_email(subject: str, body: str): if not (SMTP_HOST and SMTP_USER and SMTP_PASS and MAIL_TO and MAIL_FROM): logging.warning("Email non configuré (variables SMTP_* / MAIL_* manquantes).") return msg = MIMEText(body, _charset="utf-8") msg["Subject"] = subject msg["From"] = MAIL_FROM msg["To"] = MAIL_TO msg["Date"] = formatdate(localtime=True) try: if SMTP_PORT == 465: import ssl context = ssl.create_default_context() with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, context=context, timeout=20) as server: server.login(SMTP_USER, SMTP_PASS) server.sendmail(MAIL_FROM, MAIL_TO.split(","), msg.as_string()) else: with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as server: server.ehlo() server.starttls() server.login(SMTP_USER, SMTP_PASS) server.sendmail(MAIL_FROM, MAIL_TO.split(","), msg.as_string()) logging.info("Email envoyé.") except Exception as e: logging.error(f"Echec envoi email: {e}") def send_sms_via_webhook(text: str): if not WEBHOOK_SMS_URL: return try: import urllib.parse, urllib.request url = WEBHOOK_SMS_URL + urllib.parse.quote(text) with urllib.request.urlopen(url, timeout=10) as resp: _ = resp.read() logging.info("SMS (webhook) envoyé.") except Exception as e: logging.error(f"Echec envoi SMS webhook: {e}") def notify(subject: str, body: str): send_email(subject, body) # SMS via webhook (décommente si configuré) send_sms_via_webhook(f"{subject} - {body}") # Ou Twilio (si tu ajoutes la fonction et les variables d'env) # ---------- Watchdog ---------- class SiteStatus: def __init__(self, name: str, threshold_min: int): self.name = name self.threshold = timedelta(minutes=threshold_min) self.last_seen: datetime | None = None self.alert_sent = False def seen_now(self): self.last_seen = now_utc() def check_and_alert(self): now = now_utc() if self.last_seen is None: # Au démarrage, on attend de dépasser le seuil avant d’alerter return None delta = now - self.last_seen if delta > self.threshold: if not self.alert_sent: self.alert_sent = True return ("OUTAGE", f"{self.name} : plus de données depuis {fmt_local(self.last_seen)} " f"(écoulé: {int(delta.total_seconds()//60)} min).") else: if self.alert_sent: self.alert_sent = False return ("RECOVERY", f"{self.name} : flux rétabli, dernier message à {fmt_local(self.last_seen)}.") return None class MqttWatchdog: def __init__(self, broker_host, broker_port, user, pwd, topics, threshold_min, check_every_s): # API callbacks v2 (évite le DeprecationWarning) self.client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) self.client.username_pw_set(user, pwd) self.client.on_connect = self._on_connect self.client.on_message = self._on_message self.client.on_disconnect = self._on_disconnect self.broker_host = broker_host self.broker_port = broker_port self.topics = topics # liste de tuples (topic, qos) self.check_every_s = check_every_s # Statuts par site, déduits du préfixe: "Meudon/#" -> "Meudon" self.sites: dict[str, SiteStatus] = {} for t, _q in topics: site = t.split("/", 1)[0] self.sites[site] = SiteStatus(site, threshold_min) self._stop = threading.Event() self._checker_thread = threading.Thread(target=self._checker_loop, daemon=True) # MQTT callbacks (API v2) def _on_connect(self, client, userdata, flags, reason_code, properties=None): if reason_code == 0: logging.info("Connecté au broker MQTT.") for t, q in self.topics: client.subscribe(t, qos=q) logging.info(f"Abonné à '{t}' (QoS {q})") else: logging.error(f"Echec connexion MQTT (reason_code={reason_code})") def _on_disconnect(self, client, userdata, reason_code, properties=None): logging.warning(f"MQTT déconnecté (reason_code={reason_code}). Reconnexion auto gérée par loop_* si activée.") def _on_message(self, client, userdata, msg): # Topic attendu: "Meudon/..." ou "Saclay/..." site = msg.topic.split("/", 1)[0] if site in self.sites: self.sites[site].seen_now() logging.debug(f"{site}: message reçu, mise à jour last_seen.") # Thread de vérification périodique def _checker_loop(self): while not self._stop.is_set(): for site, status in self.sites.items(): event = status.check_and_alert() if event: kind, text = event if kind == "OUTAGE": subject = f"[ALERTE] {site} inactif > seuil" body = (f"Watchdog MQTT : {text}\n" f"Seuil: {status.threshold} | Vérif {self.check_every_s}s\n" f"Broker: {self.broker_host}:{self.broker_port}") notify(subject, body) logging.warning(text) elif kind == "RECOVERY": subject = f"[OK] {site} rétabli" body = (f"Watchdog MQTT : {text}\n" f"Seuil: {status.threshold} | Vérif {self.check_every_s}s\n" f"Broker: {self.broker_host}:{self.broker_port}") notify(subject, body) logging.info(text) self._stop.wait(self.check_every_s) def start(self): self.client.connect(self.broker_host, self.broker_port, keepalive=60) self.client.loop_start() # thread interne MQTT + reconnexions auto self._checker_thread.start() logging.info("Watchdog démarré.") def stop(self): self._stop.set() self._checker_thread.join(timeout=2) self.client.loop_stop() self.client.disconnect() # ---------- Main ---------- def parse_args(): p = argparse.ArgumentParser(description="Watchdog MQTT par site (inactivité > seuil)") p.add_argument("--log", help="Chemin du fichier log (sinon stdout).") p.add_argument("--broker-host", default=DEFAULT_BROKER_HOST) p.add_argument("--broker-port", type=int, default=DEFAULT_BROKER_PORT) p.add_argument("--mqtt-user", default=DEFAULT_MQTT_USER) p.add_argument("--mqtt-pass", default=DEFAULT_MQTT_PASS) p.add_argument("--threshold-min", type=int, default=15, help="Seuil d'inactivité en minutes") p.add_argument("--check-every-s", type=int, default=60, help="Périodicité de vérification en secondes") p.add_argument("--topics", default="Meudon/#,Saclay/#", help="liste de topics 'Site/#' séparés par des virgules") return p.parse_args() if __name__ == "__main__": args = parse_args() setup_logger(args.log) topics = [] for t in [x.strip() for x in args.topics.split(",") if x.strip()]: topics.append((t, 1)) watchdog = MqttWatchdog( broker_host=args.broker_host, broker_port=args.broker_port, user=args.mqtt_user, pwd=args.mqtt_pass, topics=topics, threshold_min=args.threshold_min, check_every_s=args.check_every_s, ) try: watchdog.start() while True: time.sleep(3600) except KeyboardInterrupt: pass except Exception as e: logging.error(f"Erreur fatale: {e}") finally: watchdog.stop() logging.info("Watchdog arrêté.")