#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Monitor températures + alertes + gyrophare MQTT + notifications (OVH SMS + SMTP). Version corrigée pour VPS unique et robustesse DB / config. Principes : - DB_HOST / MQTT_HOST / SMTP_HOST : uniques (VPS unique) - Paramètres par site via env : MAIL_TO_{SITE}, ALERT_SMS_TO_{SITE}, etc. - Les alertes ne concernent QUE les sondes présentes dans Chambres_froides pour le site et avec Etat=ON et En_entretien=0. """ import datetime as dt import enum import logging import os import re import smtplib import ssl import threading import time from datetime import datetime from email.message import EmailMessage from zoneinfo import ZoneInfo from dotenv import load_dotenv, find_dotenv # ========= .env (une seule fois) ========= load_dotenv(find_dotenv(usecwd=True), override=False) # ========= Utils projet ========= from utils_sms import normaliser_sms # ========= MySQL ========= import mysql.connector from mysql.connector import Error as MySQLError # ========= OVH (SMS) ========= try: import ovh from ovh.exceptions import APIError as OVHAPIError _ovh_available = True except Exception: ovh = None # type: ignore class OVHAPIError(Exception): ... _ovh_available = False # ========= MQTT ========= try: import paho.mqtt.client as mqtt _mqtt_ok = True except Exception: _mqtt_ok = False # ========= Helpers env ========= def _env_bool(name: str, default: bool) -> bool: v = os.getenv(name, str(int(default))).strip().lower() return v in ("1", "true", "yes", "on") def _split_list(raw: str | None) -> list[str]: return [x.strip() for x in re.split(r"[;,]", raw or "") if x.strip()] def _parse_labeled_phones(raw: str | None) -> list[tuple[str, str]]: out: list[tuple[str, str]] = [] for tok in re.split(r"[;,]", raw or ""): tok = tok.strip() if not tok: continue if ":" in tok: name, num = tok.split(":", 1) out.append((name.strip(), num.strip())) else: out.append(("", tok)) return out def _resolve_sms_receivers(labeled: list[tuple[str, str]]) -> list[str]: only = os.getenv("ALERT_SMS_ONLY") if not only: return [num for (_n, num) in labeled] allow = {x.strip() for x in re.split(r"[;,]", only) if x.strip()} return [num for (name, num) in labeled if (name and name in allow) or (num in allow)] def _human_labeled_list(labeled: list[tuple[str, str]]) -> str: return ", ".join([f"{n}({p})" if n else p for n, p in labeled]) # ========= Sécurisation du site / tables ========= _ALLOWED_SITE_RE = re.compile(r"^[A-Za-z0-9_]+$") def safe_site(site: str) -> str: site = (site or "").strip() if not site or not _ALLOWED_SITE_RE.fullmatch(site): raise ValueError(f"Nom de site invalide: {site!r}") return site # ========= Timezone ========= PARIS = ZoneInfo("Europe/Paris") def now_paris() -> dt.datetime: return dt.datetime.now(tz=PARIS) def fmt_deg(v: float) -> str: s = f"{float(v):.1f}".replace(".", ",") return f"{s}°C" # ========= Site (par défaut via env, sinon Meudon) ========= SITE = safe_site(os.getenv("SITE", "Meudon")) PROGRAM_NAME = f"Monitor_{SITE}" # ========= Logger ========= level = getattr(logging, os.getenv("LOGLEVEL", "INFO").upper(), logging.INFO) log = logging.getLogger(PROGRAM_NAME.lower()) if not log.handlers: logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s") # ========= DB Connexion ========= def get_db(): """ Connexion DB unique (VPS unique). Le site ne change pas d'hôte, seules les tables changent (ex: `Meudon`, `Alertes_Meudon`). """ return mysql.connector.connect( host=os.getenv("DB_HOST"), user=os.getenv("DB_USER"), password=os.getenv("DB_PASS"), database=os.getenv("DB_NAME", "Sondes"), port=int(os.getenv("DB_PORT", "3306")), autocommit=False, ) # ========= Alertes DB ========= def open_alert(conn, table_alertes: str, sonde: str, dt_: datetime) -> bool: """ Ouvre UNE alerte si aucune alerte 'En cours' n'existe encore pour la sonde. Retourne True si une nouvelle alerte a été créée. """ cur = conn.cursor() try: cur.execute( f"SELECT Id FROM `{table_alertes}` WHERE Sonde=%s AND Etat='En cours' ORDER BY Debut_defaut DESC LIMIT 1", (sonde,) ) if cur.fetchone(): return False cur.execute( f"INSERT INTO `{table_alertes}` (Sonde, Debut_defaut, Etat) VALUES (%s, %s, 'En cours')", (sonde, dt_.strftime("%Y-%m-%d %H:%M:%S")) ) conn.commit() return True finally: cur.close() def close_alert(conn, table_alertes: str, sonde: str) -> bool: """ Ferme la dernière alerte 'En cours' si présente. Retourne True si une alerte est passée à 'Acquitté'. """ cur = conn.cursor() try: cur.execute( f"SELECT Id FROM `{table_alertes}` WHERE Sonde=%s AND Etat='En cours' ORDER BY Debut_defaut DESC LIMIT 1", (sonde,) ) row = cur.fetchone() if not row: return False alert_id = int(row[0]) cur.execute( f"UPDATE `{table_alertes}` SET Etat='Acquitté' WHERE Id=%s", (alert_id,) ) changed = (cur.rowcount == 1) conn.commit() return changed finally: cur.close() # ========= Gyro DB (journalisation) ========= def should_insert_gyro(lieu: str, etat: str, sonde: str = "Gyro") -> bool: dbname = os.getenv("DB_NAME", "Sondes") sql = f"SELECT Etat FROM `{dbname}`.`Gyro` WHERE Lieu=%s AND Sonde=%s ORDER BY Date DESC LIMIT 1" cnx = get_db() try: cur = cnx.cursor() cur.execute(sql, (lieu, sonde)) row = cur.fetchone() return (row is None) or (row[0] != etat) finally: cnx.close() def insert_gyro_log(lieu: str, etat: str, topic: str, payload_raw: str, qos: int | None, retained: int | None, when: datetime): dbname = os.getenv("DB_NAME", "Sondes") cnx = get_db() try: cur = cnx.cursor() cur.execute( f"INSERT INTO `{dbname}`.`Gyro` (Lieu, Sonde, Etat, Date, Topic, Payload, QoS, Retained) " "VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", ( lieu, os.getenv("GYRO_SONDE_NAME", "Gyro"), etat, when.strftime("%Y-%m-%d %H:%M:%S"), topic, payload_raw, qos, retained ) ) cnx.commit() log.info("Gyro inséré: %s %s (%s)", lieu, etat, topic) except MySQLError as err: log.exception("Erreur DB insert_gyro_log: %s", err) finally: cnx.close() # ========= Lecture températures / config ========= def lire_sondes_depuis_db(site: str): """ Dernière mesure par sonde, en ignorant Temperature NULL. """ table = safe_site(site) sql = f""" SELECT t1.Sonde, t1.Temperature, t1.Date FROM `{table}` t1 JOIN ( SELECT Sonde, MAX(Date) AS MaxDate FROM `{table}` WHERE Temperature IS NOT NULL GROUP BY Sonde ) t2 ON t1.Sonde=t2.Sonde AND t1.Date=t2.MaxDate WHERE t1.Temperature IS NOT NULL """ cnx = get_db() try: cur = cnx.cursor(dictionary=True) cur.execute(sql) rows = cur.fetchall() for r in rows: r["Temperature"] = float(r["Temperature"]) return rows except MySQLError as err: log.exception("Erreur DB (lire_sondes_depuis_db): %s", err) return [] finally: cnx.close() def lire_cfg_chambres(site: str): """ Retourne {sonde: {"temp_max": float, "active": bool, "entretien": bool}} depuis Chambres_froides pour le site. """ dbname = os.getenv("DB_NAME", "Sondes") sql = f""" SELECT Sonde, Temp_Max, Etat, En_entretien FROM `{dbname}`.`Chambres_froides` WHERE Lieu=%s """ cnx = get_db() cfg: dict[str, dict] = {} try: cur = cnx.cursor() cur.execute(sql, (site,)) for sonde, temp_max, etat, en_entretien in cur.fetchall(): cfg[str(sonde)] = { "temp_max": float(temp_max), "active": str(etat).upper() == "ON", "entretien": bool(int(en_entretien or 0)), } return cfg except MySQLError as err: log.exception("Erreur DB (lire_cfg_chambres): %s", err) return cfg finally: cnx.close() def compute_site_alarm(last_values: list[dict], cfg: dict[str, dict], hysteresis: float = 0.0): """ Retourne (is_on: bool, trigger: (sonde,temp,seuil) | None) Ne considère que les sondes actives et non en entretien. """ for row in last_values: sonde = str(row["Sonde"]) meta = cfg.get(sonde) if not meta: continue if (not meta["active"]) or meta["entretien"]: continue temp = float(row["Temperature"]) seuil = float(meta["temp_max"]) if temp > seuil + float(hysteresis): return True, (sonde, temp, seuil) return False, None def depassement_depuis_30min(site: str, sonde: str, seuil: float) -> bool: """ True si la sonde est > seuil de façon CONTINUE depuis CONT_MIN minutes. CONT_MIN = ALERT_CONTINUOUS_MINUTES (defaut 30) LOOKBACK = ALERT_LOOKBACK_MINUTES (defaut max(60, CONT_MIN*3)) Hypothèse : Date en DB et horloge VPS cohérentes (timezone serveur). """ CONT_MIN = int(os.getenv("ALERT_CONTINUOUS_MINUTES", "30")) LOOKBACK = int(os.getenv("ALERT_LOOKBACK_MINUTES", str(max(60, CONT_MIN * 3)))) table = safe_site(site) cnx = get_db() try: cur = cnx.cursor() cur.execute(f""" SELECT Temperature, Date FROM `{table}` WHERE Sonde=%s AND Temperature IS NOT NULL AND Date >= (NOW() - INTERVAL %s MINUTE) ORDER BY Date DESC """, (sonde, LOOKBACK)) rows = cur.fetchall() if not rows: return False last_temp, last_dt = float(rows[0][0]), rows[0][1] if last_temp <= float(seuil): return False # Début de la séquence continue > seuil start_dt = last_dt for temp, d in rows[1:]: if float(temp) > float(seuil): start_dt = d else: break # Comparaison sur l'horloge locale (VPS) : cohérent avec NOW() DB si timezone serveur identique now_local = dt.datetime.now() dur_min = (now_local - start_dt).total_seconds() / 60.0 log.debug("Seq>seuil %s: start=%s, now=%s, dur=%.1fmin, need>=%d", sonde, start_dt, now_local, dur_min, CONT_MIN) return dur_min >= CONT_MIN except MySQLError as err: log.exception("Erreur DB (depassement_depuis_30min): %s", err) return False finally: cnx.close() # ========= Notifier (OVH SMS + SMTP) ========= class Notifier: def __init__(self, site: str): self.site = site # OVH SMS self.ovh_enabled = _ovh_available and all( os.getenv(k) for k in ("OVH_APPLICATION_KEY","OVH_APPLICATION_SECRET","OVH_CONSUMER_KEY","OVH_SMS_SERVICE","OVH_SMS_SENDER") ) if self.ovh_enabled: self.ovh_client = ovh.Client( endpoint=os.getenv("OVH_ENDPOINT", "ovh-eu"), application_key=os.getenv("OVH_APPLICATION_KEY"), application_secret=os.getenv("OVH_APPLICATION_SECRET"), consumer_key=os.getenv("OVH_CONSUMER_KEY"), ) self.ovh_service = os.getenv("OVH_SMS_SERVICE") self.ovh_sender = os.getenv("OVH_SMS_SENDER") raw_sms = (os.getenv(f"ALERT_SMS_TO_{site}") or os.getenv(f"ALERT_SMS_TO_{site.upper()}") or os.getenv("ALERT_SMS_TO")) self.sms_labeled = _parse_labeled_phones(raw_sms) else: self.sms_labeled = [] # SMS CLIENTS raw_sms_client = ( os.getenv(f"ALERT_SMS_CLIENT_TO_{site}") or os.getenv(f"ALERT_SMS_CLIENT_TO_{site.upper()}") or os.getenv("ALERT_SMS_CLIENT_TO") or os.getenv(f"ALERTE_CLIENT_{site}") or os.getenv("ALERTE_CLIENT") ) self.sms_client_labeled = _parse_labeled_phones(raw_sms_client) self.sms_client_enabled = (os.getenv("ALERT_SMS_CLIENT_ENABLED", "1") == "1") # SMTP self.smtp_host = os.getenv("SMTP_HOST") self.smtp_port = int(os.getenv("SMTP_PORT", "465")) self.smtp_user = os.getenv("SMTP_USER") self.smtp_pass = os.getenv("SMTP_PASS") self.smtp_security = (os.getenv("SMTP_SECURITY", "SSL") or "SSL").upper() raw_mail_to = (os.getenv(f"MAIL_TO_{site}") or os.getenv(f"MAIL_TO_{site.upper()}") or os.getenv("MAIL_TO") or "") self.mail_to = _split_list(raw_mail_to) self.mail_from = (os.getenv(f"MAIL_FROM_{site}") or os.getenv(f"MAIL_FROM_{site.upper()}") or os.getenv("MAIL_FROM") or self.smtp_user) self.smtp_enabled = all([self.smtp_host, self.smtp_port, self.smtp_user, self.smtp_pass, self.mail_to]) def send_sms(self, message: str, tag: str | None = None) -> bool: tag = tag or f"monitor-{self.site.lower()}" if not self.ovh_enabled or not self.sms_labeled: log.warning("SMS désactivé ou aucun destinataire.") return False receivers = _resolve_sms_receivers(self.sms_labeled) if not receivers: log.warning("ALERT_SMS_ONLY filtre tous les destinataires (aucun envoi).") return False message = normaliser_sms(message, prefix=self.site) payload = { "sender": self.ovh_sender, "receivers": receivers, "message": message, "priority": "high", "coding": "7bit", "class": "phoneDisplay", "noStopClause": True, "senderForResponse": False, "validityPeriod": 2880, "tag": tag, } try: log.info("Envoi SMS vers: %s", _human_labeled_list([(n, p) for (n, p) in self.sms_labeled if p in receivers])) resp = self.ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload) ids = resp.get("ids") or [] log.info("SMS OVH envoyé (job ids=%s)", ids) return True except OVHAPIError as err: log.exception("Erreur API OVH: %s", err) return False except Exception as err: log.exception("Echec envoi SMS OVH: %s", err) return False def send_sms_client(self, message: str, tag: str | None = None) -> bool: tag = tag or f"monitor-client-{self.site.lower()}" if not self.ovh_enabled: log.warning("SMS client: OVH non configuré.") return False if not self.sms_client_enabled or not self.sms_client_labeled: log.info("SMS client: désactivé ou aucun destinataire.") return False only = os.getenv("ALERT_SMS_CLIENT_ONLY") if only: allow = {x.strip() for x in re.split(r"[;,]", only) if x.strip()} labeled = [(n, p) for (n, p) in self.sms_client_labeled if (n and n in allow) or (p in allow)] else: labeled = self.sms_client_labeled receivers = [num for (_n, num) in labeled] if not receivers: log.info("SMS client: filtre vide → aucun envoi.") return False message = normaliser_sms(message, prefix=self.site) payload = { "sender": self.ovh_sender, "receivers": receivers, "message": message, "priority": "high", "coding": "7bit", "class": "phoneDisplay", "noStopClause": True, "senderForResponse": False, "validityPeriod": 2880, "tag": tag, } try: log.info("Envoi SMS CLIENT vers: %s", _human_labeled_list(labeled)) resp = self.ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload) log.info("SMS CLIENT OVH envoyé (job ids=%s)", resp.get("ids")) return True except Exception as err: log.exception("Echec SMS CLIENT OVH: %s", err) return False def send_email(self, subject: str, body: str) -> bool: if not self.smtp_enabled: log.warning("SMTP non configuré, email non envoyé.") return False msg = EmailMessage() msg["From"] = self.mail_from msg["To"] = ", ".join(self.mail_to) msg["Subject"] = subject msg.set_content(body) timeout = int(os.getenv("SMTP_TIMEOUT", "60")) debug = os.getenv("SMTP_DEBUG", "0") == "1" def _send_ssl(): with smtplib.SMTP_SSL(self.smtp_host, self.smtp_port, context=ssl.create_default_context(), timeout=timeout) as server: if debug: server.set_debuglevel(1) server.login(self.smtp_user, self.smtp_pass) server.send_message(msg) def _send_starttls(): with smtplib.SMTP(self.smtp_host, self.smtp_port, timeout=timeout) as server: if debug: server.set_debuglevel(1) server.ehlo() server.starttls(context=ssl.create_default_context()) server.ehlo() server.login(self.smtp_user, self.smtp_pass) server.send_message(msg) try: if self.smtp_security == "STARTTLS": _send_starttls() else: _send_ssl() log.info("Email envoyé à %s", self.mail_to) return True except (smtplib.SMTPException, ssl.SSLError, TimeoutError) as err: log.exception("Erreur SMTP: %s", err) return False except Exception as err: log.exception("Echec envoi email: %s", err) return False # ========= Messages ========= def build_alert_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None): when = when or now_paris() subject = f"[ALERTE {site}] {sonde} au-dessus du seuil" lines = [ subject + ":", f"Sonde: {sonde}", f"Température: {fmt_deg(temp)} (seuil {fmt_deg(seuil)})", f"Site: {site}", f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}", ] txt = "\n".join(lines) return subject, txt, txt def build_ok_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None): when = when or now_paris() subject = f"[OK {site}] {sonde} revenue normale" lines = [ subject + ":", f"Sonde: {sonde}", f"Température: {fmt_deg(temp)} <= seuil {fmt_deg(seuil)}", f"Site: {site}", f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}", ] txt = "\n".join(lines) return subject, txt, txt def build_client_alert_sms(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None) -> str: when = when or now_paris() return f"ALERTE CLIENT {sonde}: T={fmt_deg(temp)} > S={fmt_deg(seuil)} H:{when.strftime('%H:%M')}" # ========= MQTT Gyrophare ========= class MQTTPublisher: def __init__(self, site: str): self.enabled = bool(_mqtt_ok) self.site = site self.topic = ( os.getenv(f"GYRO_MQTT_TOPIC_{site}") or os.getenv(f"GYRO_MQTT_TOPIC_{site.upper()}") or os.getenv("GYRO_MQTT_TOPIC") or f"Sondes/{site}/Gyro/cmd" ) self.last_state: bool | None = None if not self.enabled: log.info("Gyro MQTT désactivé (paho-mqtt absent).") return host = os.getenv("MQTT_HOST", "localhost") port = int(os.getenv("MQTT_PORT", "1883")) user = os.getenv("MQTT_USER") pwd = os.getenv("MQTT_PASS") tls = (os.getenv("MQTT_TLS", "0") == "1") # Compat paho 1.x / 2.x cbver = getattr(mqtt, "CallbackAPIVersion", None) if cbver is not None: api_v = ( getattr(cbver, "VERSION2", None) or getattr(cbver, "V5", None) or getattr(cbver, "v5", None) or getattr(cbver, "V311", None) ) try: self.client = mqtt.Client(callback_api_version=api_v) if api_v else mqtt.Client() except TypeError: self.client = mqtt.Client() else: self.client = mqtt.Client() if user and pwd: self.client.username_pw_set(user, pwd) if tls: self.client.tls_set() try: self.client.on_message = self._on_message self.client.connect(host, port, keepalive=30) subs_env = ( os.getenv(f"GYRO_MQTT_SUB_{site}") or os.getenv(f"GYRO_MQTT_SUB_{site.upper()}") or os.getenv("GYRO_MQTT_SUB") or "" ) subs = [t.strip() for t in subs_env.split(",") if t.strip()] if not subs: subs = [ self.topic, f"Sondes/{site}/Gyro/#", f"{site}/Gyro/#", "Gyro/#", ] for t in subs: try: self.client.subscribe(t, qos=2) log.info("MQTT subscribe: %s", t) except Exception as e: log.warning("Subscribe échoué (%s): %s", t, e) self.client.loop_start() log.info("MQTT connecté (%s:%s), topic=%s", host, port, self.topic) except Exception as e: log.exception("MQTT connexion impossible: %s", e) self.enabled = False def _on_message(self, client, userdata, msg): lieu = self.site topic = msg.topic payload_raw = msg.payload.decode(errors="ignore").strip() upper = payload_raw.upper() if upper in ("ON", "OFF") or "gyro" in topic.lower() or "gyrophare" in topic.lower(): etat = upper if upper in ("ON", "OFF") else ("ON" if "ON" in upper else "OFF") try: if should_insert_gyro(lieu, etat): insert_gyro_log( lieu=lieu, etat=etat, topic=topic, payload_raw=payload_raw, qos=getattr(msg, "qos", None), retained=getattr(msg, "retain", None), when=now_paris(), ) except Exception as e: log.exception("Insert Gyro échoué: %s", e) return # sinon ignorer try: float(payload_raw.replace(",", ".")) except ValueError: log.debug("Payload non géré (ni gyro ni nombre): %s %s", topic, payload_raw) def set(self, on: bool): if not self.enabled: return if self.last_state is not None and self.last_state == on: return payload = "ON" if on else "OFF" try: r = self.client.publish(self.topic, payload=payload, qos=2, retain=True) try: r.wait_for_publish(timeout=3) except Exception: pass if getattr(r, "rc", 0) != 0: log.warning("MQTT publish rc=%s (topic=%s)", getattr(r, "rc", None), self.topic) return log.info("Gyro %s -> %s (MQTT)", self.site, payload) try: insert_gyro_log( lieu=self.site, etat=payload, topic=self.topic, payload_raw=payload, qos=2, retained=1, when=now_paris(), ) except Exception as e: log.exception("Insert événement gyro DB échoué: %s", e) self.last_state = on except Exception as e: log.exception("MQTT publish erreur: %s", e) # ========= Contrôleur Gyro réactif ========= class _GyroState(enum.Enum): IDLE = 0 PULSE_ON = 1 COOLDOWN = 2 class GyroPulseController: """ Boucle rapide indépendante : - MODE CONTINU (défaut) : ON tant que l’alarme persiste, OFF quand normal confirmé. - MODE PULSE : ON (PULSE_SEC) puis OFF (COOLDOWN_SEC), tant que l’alarme persiste. Ajouts : - SMS ALERTE immédiat à l’allumage (option) - SMS OK immédiat à l’extinction (option) """ def __init__(self, site: str, beacon: MQTTPublisher, notifier: Notifier, *, check_sec: int = int(os.getenv("GYRO_CHECK_SEC", "20")), pulse_sec: int = int(os.getenv("GYRO_PULSE_SEC", "60")), cooldown_sec: int = int(os.getenv("GYRO_COOLDOWN_SEC", "600")), normal_confirm: int = int(os.getenv("GYRO_NORMAL_CONFIRM", "2"))): self.site = site self.beacon = beacon self.notifier = notifier self.check_sec = check_sec self.pulse_sec = pulse_sec self.cooldown_sec = cooldown_sec self.normal_confirm = normal_confirm self.state = _GyroState.IDLE self._t_pulse_end = 0.0 self._t_cooldown_end = 0.0 self._normal_count = 0 self._stop = threading.Event() self._thread: threading.Thread | None = None self._current: bool | None = None self._last_sms: dict[str, float] = {} self._sms_min_sec = int(os.getenv("ALERT_SMS_COOLDOWN_SEC") or os.getenv("GYRO_SMS_MIN_SEC", "120")) self._send_ok = _env_bool("ALERT_OK_SMS_GYRO", True) self._last_trigger: tuple[str, float, float] | None = None def _set_gyro(self, on: bool): if self._current is not on: self.beacon.set(on) self._current = on def start(self): if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() log.info("GyroPulseController démarré (site=%s, check=%ss, pulse=%ss, cooldown=%ss, confirm=%d)", self.site, self.check_sec, self.pulse_sec, self.cooldown_sec, self.normal_confirm) def stop(self): self._stop.set() def _sms_can_send(self, sonde: str) -> bool: t = time.time() last = self._last_sms.get(sonde, 0.0) if (t - last) >= self._sms_min_sec: self._last_sms[sonde] = t return True return False def _send_alert_sms(self, trigger: tuple[str, float, float] | None): if not _env_bool("ALERT_INTERNAL_SMS_ENABLED", True): return if not trigger: return sonde, temp, seuil = trigger if self._sms_can_send(sonde): _, sms_text, _ = build_alert_text(self.site, sonde, temp, seuil, when=now_paris()) self.notifier.send_sms(sms_text) def _send_ok_sms_from_last_trigger(self): if not self._send_ok or not self._last_trigger: return sonde, _temp_prev, seuil = self._last_trigger # Température courante (best-effort) rows = lire_sondes_depuis_db(self.site) curr_temp = None for r in rows: if str(r["Sonde"]) == sonde: curr_temp = float(r["Temperature"]) break if curr_temp is None: curr_temp = seuil - 0.1 if self._sms_can_send(sonde): _, sms_text, _ = build_ok_text(self.site, sonde, curr_temp, seuil, when=now_paris()) self.notifier.send_sms(sms_text) self._last_trigger = None def _is_alarm_now(self) -> tuple[bool, tuple[str, float, float] | None]: last_rows = lire_sondes_depuis_db(self.site) cfg = lire_cfg_chambres(self.site) return compute_site_alarm(last_rows, cfg, hysteresis=float(os.getenv("GYRO_HYSTERESIS", "0.0"))) def _run(self): while not self._stop.is_set(): now_ts = time.time() try: active, trigger = self._is_alarm_now() except Exception as e: log.exception("Gyro fast-loop: erreur lecture état: %s", e) active, trigger = (False, None) if self.state == _GyroState.IDLE: if active: self._set_gyro(True) self._t_pulse_end = now_ts + self.pulse_sec self._normal_count = 0 self.state = _GyroState.PULSE_ON self._last_trigger = trigger if trigger: s, t, se = trigger log.info("Gyro → ON déclenché par %s: %.2f > %.2f (mode %s)", s, t, se, "CONTINU" if _env_bool("GYRO_MODE_CONTINUOUS", True) else "PULSE") # SMS alerte immédiat (option) if _env_bool("ALERT_INTERNAL_SMS_ENABLED", False): self._send_alert_sms(trigger) elif self.state == _GyroState.PULSE_ON: if not active: self._normal_count += 1 if self._normal_count >= self.normal_confirm: self._set_gyro(False) self.state = _GyroState.IDLE self._normal_count = 0 log.info("Gyro → OFF (retour à la normale confirmé)") if _env_bool("ALERT_OK_SMS_GYRO", False): self._send_ok_sms_from_last_trigger() else: self._normal_count = 0 if not _env_bool("GYRO_MODE_CONTINUOUS", True): if now_ts >= self._t_pulse_end: self._set_gyro(False) self._t_cooldown_end = now_ts + self.cooldown_sec self.state = _GyroState.COOLDOWN log.info("Gyro → OFF, cooldown %ss (alerte persiste)", self.cooldown_sec) elif self.state == _GyroState.COOLDOWN: if not active: self._normal_count += 1 if self._normal_count >= self.normal_confirm: self.state = _GyroState.IDLE self._normal_count = 0 log.info("Gyro: retour IDLE (plus d’alerte)") else: self._normal_count = 0 if now_ts >= self._t_cooldown_end: self._set_gyro(True) self._t_pulse_end = now_ts + self.pulse_sec self.state = _GyroState.PULSE_ON log.info("Gyro → ON (re-pulse)") time.sleep(self.check_sec) # ========= Notifs haut-niveau ========= def notifier_sur_depassement(notifier: Notifier, site: str, sonde: str, temp: float, seuil: float): """ MAIL quand l'alerte est confirmée (≥30 min) et ouverte en base. + SMS CLIENT couplé (si activé). (Le SMS interne immédiat est géré par la boucle gyro.) """ subject, _sms_text, email_body = build_alert_text(site, sonde, temp, seuil) notifier.send_email(subject, email_body) if _env_bool("ALERT_SMS_CLIENT_ENABLED", True): client_msg = build_client_alert_sms(site, sonde, temp, seuil) notifier.send_sms_client(client_msg, tag=f"client-{site.lower()}") def notifier_acquittement(notifier: Notifier, site: str, sonde: str, temp: float, seuil: float): """ MAIL lorsque l’alerte est acquittée en base. (Le SMS OK est géré par la boucle gyro si activé.) """ subject, sms_text, email_body = build_ok_text(site, sonde, temp, seuil) notifier.send_email(subject, email_body) if _env_bool("ALERT_OK_SMS", False): notifier.send_sms(sms_text) # ========= Cycle & boucle ========= def run_monitor_cycle(site: str, notifier: Notifier): site = safe_site(site) # 1) Lecture dernières mesures + config last_rows = lire_sondes_depuis_db(site) cfg = lire_cfg_chambres(site) # 2) Info: état instantané (gyro piloté par la boucle rapide) try: gyro_on, trigger = compute_site_alarm(last_rows, cfg, hysteresis=float(os.getenv("GYRO_HYSTERESIS", "0.0"))) if trigger: s, t, se = trigger log.info("Dépassement détecté (gyro géré par boucle rapide) : %s %.2f > %.2f", s, t, se) else: log.info("Aucun dépassement au moment du cycle") except Exception as e: log.exception("Erreur calcul alarme (info): %s", e) # 3) Alertes temporisées (≥30 min) : uniquement sur sondes configurées, actives, non entretien table_alertes = f"Alertes_{site}" for r in last_rows: nom = str(r["Sonde"]) temp = float(r["Temperature"]) meta = cfg.get(nom) if not meta: continue if (not meta["active"]) or meta["entretien"]: continue seuil = float(meta["temp_max"]) now_ = now_paris() if temp > seuil: if depassement_depuis_30min(site, nom, seuil): conn = None try: conn = get_db() if open_alert(conn, table_alertes, nom, now_): notifier_sur_depassement(notifier, site, nom, temp, seuil) finally: if conn: conn.close() else: conn = None try: conn = get_db() if close_alert(conn, table_alertes, nom): notifier_acquittement(notifier, site, nom, temp, seuil) finally: if conn: conn.close() def run_monitor_loop(site: str, period_sec: int = 300): site = safe_site(site) notifier = Notifier(site) beacon = MQTTPublisher(site) log.info("%s démarré (site=%s, période=%ss) ✅", f"Monitor_{site}", site, period_sec) # Boucle rapide gyro try: gyro_controller = GyroPulseController(site, beacon, notifier) gyro_controller.start() except Exception as e: log.exception("Impossible de démarrer le GyroPulseController: %s", e) # Boucle lente while True: t0 = time.time() try: run_monitor_cycle(site, notifier) except Exception as err: log.exception("Erreur cycle monitoring: %s", err) time.sleep(max(0, period_sec - (time.time() - t0))) # ========= CLI ========= if __name__ == "__main__": import argparse p = argparse.ArgumentParser(description=PROGRAM_NAME) p.add_argument("--site", default=os.getenv("SITE", SITE), help="Nom du site (ex: Meudon, Saclay, Roissy)") p.add_argument("--period", type=int, default=300) p.add_argument("--test-sms", action="store_true") p.add_argument("--test-mail", action="store_true") p.add_argument("--test-alert", action="store_true") p.add_argument("--test-ok", action="store_true") p.add_argument("--once", action="store_true") args = p.parse_args() site = safe_site(args.site) notifier = Notifier(site) if args.test_sms: notifier.send_sms("TEST DOMO91 (transactionnel)") elif args.test_mail: notifier.send_email(f"[TEST {site}] Mail", "OK") elif args.test_alert: notifier_sur_depassement(notifier, site, "Congelateur", -14.5, -15.0) elif args.test_ok: notifier_acquittement(notifier, site, "Congelateur", -15.2, -15.0) else: if args.once: run_monitor_cycle(site, notifier) else: run_monitor_loop(site, period_sec=args.period)