#!/usr/bin/env python3 # -*- coding: utf-8 -*- # ========= Site ========= SITE = "Saclay" PROGRAM_NAME = f"Monitor_{SITE}" # ========= Imports & .env ========= import json import logging import os import smtplib import ssl import time import threading import enum import datetime as dt from datetime import datetime from email.message import EmailMessage from typing import Any, cast from zoneinfo import ZoneInfo import requests import mysql.connector from mysql.connector import Error as MySQLError from dotenv import find_dotenv, load_dotenv load_dotenv(find_dotenv(usecwd=True), override=False) def _env_str(name: str, default: str = "") -> str: return (os.getenv(name, default) or "").strip() def _env_bool(name: str, default: bool) -> bool: value = _env_str(name, "1" if default else "0").lower() return value in ("1", "true", "yes", "on") # MQTT try: import paho.mqtt.client as mqtt _mqtt_ok = True except Exception: mqtt = None # type: ignore[assignment] _mqtt_ok = False # ========= Logger ========= level = getattr(logging, _env_str("LOGLEVEL", "INFO").upper(), logging.INFO) log = logging.getLogger(PROGRAM_NAME.lower()) if not log.handlers: logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s") # ========= Helpers types ========= def _to_float(value: Any) -> float: return float(cast(Any, value)) def _to_datetime(value: Any) -> datetime: if isinstance(value, datetime): return value raise TypeError(f"datetime attendu, reçu: {type(value)!r}") # ========= DB utils ========= def get_db(): return mysql.connector.connect( host=_env_str("DB_HOST"), user=_env_str("DB_USER"), password=_env_str("DB_PASS"), database=_env_str("DB_NAME", "Sondes"), port=int(_env_str("DB_PORT", "3306")), autocommit=True, ) def open_alert(conn, table_alertes: str, sonde: str, dt_: datetime) -> bool: """ Ouvre UNE alerte si aucune alerte 'En cours' n'existe encore pour la sonde. Retourne True si une nouvelle alerte a été créée. """ cur = conn.cursor() cur.execute( f"SELECT 1 FROM `{table_alertes}` WHERE Sonde=%s AND Etat='En cours' LIMIT 1", (sonde,), ) if cur.fetchone(): cur.close() return False cur.execute( f"INSERT INTO `{table_alertes}` (Sonde, Debut_defaut, Etat) VALUES (%s, %s, 'En cours')", (sonde, dt_.strftime("%Y-%m-%d %H:%M:%S")), ) conn.commit() cur.close() return True def close_alert(conn, table_alertes: str, sonde: str) -> bool: """ Ferme l'alerte 'En cours' si présente. Retourne True si une alerte est passée à 'Acquitté'. """ cur = conn.cursor() cur.execute( f"UPDATE `{table_alertes}` SET Etat='Acquitté' " f"WHERE Sonde=%s AND Etat='En cours' " f"ORDER BY Debut_defaut DESC LIMIT 1", (sonde,), ) changed = cur.rowcount == 1 conn.commit() cur.close() return changed # --- Journalisation Gyro en table dédiée `Gyro` --- def insert_gyro_log( lieu: str, etat: str, topic: str, payload_raw: str, qos: int | None, retained: int | None, when: datetime, ) -> None: cnx = get_db() try: cur = cnx.cursor() cur.execute( "INSERT INTO Sondes.Gyro (Lieu, Sonde, Etat, Date, Topic, Payload, QoS, Retained) " "VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", ( lieu, _env_str("GYRO_SONDE_NAME", "Gyro"), etat, when.strftime("%Y-%m-%d %H:%M:%S"), topic, payload_raw, qos, retained, ), ) cnx.commit() log.info("Gyro inséré: %s %s (%s)", lieu, etat, topic) except MySQLError as err: log.exception("Erreur DB insert_gyro_log: %s", err) finally: cnx.close() def should_insert_gyro(lieu: str, etat: str, sonde: str = "Gyro") -> bool: sql = "SELECT Etat FROM Sondes.Gyro WHERE Lieu=%s AND Sonde=%s ORDER BY Date DESC LIMIT 1" cnx = get_db() try: cur = cnx.cursor() cur.execute(sql, (lieu, sonde)) row = cur.fetchone() return (row is None) or (row[0] != etat) finally: cnx.close() # --- Lecture des dernières mesures de température --- def lire_sondes_depuis_db(site: str) -> list[dict[str, Any]]: sql = f""" SELECT t1.Sonde, t1.Temperature, t1.Date FROM `{site}` t1 JOIN ( SELECT Sonde, MAX(Date) AS MaxDate FROM `{site}` WHERE Temperature IS NOT NULL GROUP BY Sonde ) t2 ON t1.Sonde=t2.Sonde AND t1.Date=t2.MaxDate WHERE t1.Temperature IS NOT NULL """ cnx = get_db() try: cur = cnx.cursor(dictionary=True) cur.execute(sql) rows = cast(list[dict[str, Any]], cur.fetchall()) for row in rows: row["Temperature"] = float(row["Temperature"]) return rows except MySQLError as err: log.exception("Erreur DB (lire_sondes_depuis_db): %s", err) return [] finally: cnx.close() def lire_cfg_chambres(site: str) -> dict[str, dict[str, float | bool]]: """ Retourne {sonde: {"temp_max": float, "active": bool}} depuis Chambres_froides. """ dbname = _env_str("DB_NAME", "Sondes") sql = f""" SELECT Sonde, Temp_Max, Etat FROM `{dbname}`.`Chambres_froides` WHERE Lieu=%s """ cnx = get_db() cfg: dict[str, dict[str, float | bool]] = {} try: cur = cnx.cursor() cur.execute(sql, (site,)) for sonde, temp_max, etat in cur.fetchall(): cfg[str(sonde)] = { "temp_max": float(temp_max), "active": str(etat).upper() == "ON", } return cfg except MySQLError as err: log.exception("Erreur DB (lire_cfg_chambres): %s", err) return cfg finally: cnx.close() def compute_site_alarm( last_values: list[dict[str, Any]], cfg: dict[str, dict[str, float | bool]], hysteresis: float = 0.0, ) -> tuple[bool, tuple[str, float, float] | None]: """ Retourne (is_on, trigger) avec trigger = (sonde, temperature, seuil). """ for row in last_values: sonde = str(row["Sonde"]) meta = cfg.get(sonde) if not meta or not meta.get("active", False): continue temp = _to_float(row["Temperature"]) seuil = _to_float(meta["temp_max"]) if temp > seuil + hysteresis: return True, (sonde, temp, seuil) return False, None def depassement_depuis_30min(site: str, sonde: str, seuil: float) -> bool: """ True si la sonde est > seuil de façon continue depuis CONT_MIN minutes. """ cont_min = int(_env_str("ALERT_CONTINUOUS_MINUTES", "30")) lookback = int( _env_str( "ALERT_LOOKBACK_MINUTES", str(max(60, int(_env_str("ALERT_CONTINUOUS_MINUTES", "30")) * 3)), ) ) cnx = get_db() try: cur = cnx.cursor() cur.execute( f""" SELECT Temperature, Date FROM `{site}` WHERE Sonde=%s AND Date >= (NOW() - INTERVAL %s MINUTE) ORDER BY Date DESC """, (sonde, lookback), ) rows = cur.fetchall() if not rows: return False first_row = cast(tuple[Any, Any], rows[0]) last_temp = _to_float(first_row[0]) last_dt = _to_datetime(first_row[1]) if last_temp <= seuil: return False start_dt = last_dt for temp, row_dt in rows[1:]: if _to_float(temp) > seuil: start_dt = _to_datetime(row_dt) else: break tzinfo = getattr(start_dt, "tzinfo", None) now = dt.datetime.now(tz=tzinfo) dur_min = (now - start_dt).total_seconds() / 60.0 log.debug( "Seq>seuil %s: start=%s, now=%s, dur=%.1fmin, need>=%d", sonde, start_dt, now, dur_min, cont_min, ) return dur_min >= cont_min except MySQLError as err: log.exception("Erreur DB (depassement_depuis_30min): %s", err) return False finally: cnx.close() # ========= Synology Chat ========= def send_synology_chat(message: str, *, username: str | None = None) -> bool: webhook = ( _env_str(f"SYNO_CHAT_WEBHOOK_{SITE}") or _env_str(f"SYNO_CHAT_WEBHOOK_MONITOR_SACLAY") or _env_str("SYNO_CHAT_WEBHOOK") ) if not webhook: log.info("Synology Chat non configuré.") return False botname = username or _env_str("SYNO_CHAT_BOTNAME") timeout = int(_env_str("SYNO_CHAT_TIMEOUT", "10")) verify_ssl = _env_bool("SYNO_CHAT_VERIFY_SSL", True) chat_payload: dict[str, str] = {"text": message} if botname: chat_payload["username"] = botname form_data = { "payload": json.dumps(chat_payload, ensure_ascii=False) } try: response = requests.post( webhook, data=form_data, timeout=timeout, verify=verify_ssl, ) txt = (response.text or "").strip() log.info("Réponse Synology Chat: %s", txt[:300] if txt else "") response.raise_for_status() try: data = response.json() if isinstance(data, dict): success = bool(data.get("success", False)) if not success: log.warning("Synology Chat a répondu sans succès: %s", data) return success except ValueError: pass return txt.lower() == "ok" or not txt except requests.RequestException as err: log.exception("Echec envoi Synology Chat: %s", err) return False # ========= Notifier mail ========= class Notifier: def __init__(self) -> None: self.smtp_host = _env_str("SMTP_HOST") self.smtp_port = int(_env_str("SMTP_PORT", "465")) self.smtp_user = _env_str("SMTP_USER") self.smtp_pass = _env_str("SMTP_PASS") self.smtp_security = _env_str("SMTP_SECURITY", "SSL").upper() raw_mail_to = ( _env_str(f"MAIL_TO_{SITE}") or _env_str(f"MAIL_TO_{SITE.upper()}") or _env_str("MAIL_TO") ) self.mail_to = [x.strip() for x in raw_mail_to.replace(";", ",").split(",") if x.strip()] self.mail_from = ( _env_str(f"MAIL_FROM_{SITE}") or _env_str(f"MAIL_FROM_{SITE.upper()}") or _env_str("MAIL_FROM") or self.smtp_user ) self.smtp_enabled = all([ self.smtp_host, self.smtp_port, self.smtp_user, self.smtp_pass, self.mail_to, ]) def send_email(self, subject: str, body: str) -> bool: if not self.smtp_enabled: log.warning("SMTP non configuré, email non envoyé.") return False msg = EmailMessage() msg["From"] = self.mail_from msg["To"] = ", ".join(self.mail_to) msg["Subject"] = subject msg.set_content(body) timeout = int(_env_str("SMTP_TIMEOUT", "60")) debug = _env_bool("SMTP_DEBUG", False) def _send_ssl() -> None: with smtplib.SMTP_SSL( self.smtp_host, self.smtp_port, context=ssl.create_default_context(), timeout=timeout, ) as server: if debug: server.set_debuglevel(1) server.login(self.smtp_user, self.smtp_pass) server.send_message(msg) def _send_starttls() -> None: with smtplib.SMTP(self.smtp_host, self.smtp_port, timeout=timeout) as server: if debug: server.set_debuglevel(1) server.ehlo() server.starttls(context=ssl.create_default_context()) server.ehlo() server.login(self.smtp_user, self.smtp_pass) server.send_message(msg) try: if self.smtp_security == "STARTTLS": try: _send_starttls() except (smtplib.SMTPServerDisconnected, TimeoutError, smtplib.SMTPConnectError) as err: log.warning("STARTTLS/587 a échoué (%s). Tentative en SSL/465...", err) _send_ssl() else: _send_ssl() log.info("Email envoyé à %s", self.mail_to) return True except (smtplib.SMTPException, ssl.SSLError, TimeoutError) as err: log.exception("Erreur SMTP: %s", err) return False except Exception as err: log.exception("Échec envoi email: %s", err) return False # ========= Mise en forme messages ========= PARIS = ZoneInfo("Europe/Paris") def fmt_deg(value: float) -> str: return f"{float(value):.1f}".replace(".", ",") + "°C" def now_paris() -> dt.datetime: return dt.datetime.now(tz=PARIS) def build_alert_text( site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None, ) -> tuple[str, str, str]: when_dt = when if when is not None else now_paris() subject = f"[ALERTE {site}] {sonde} au-dessus du seuil" lines = [ subject + ":", f"Sonde: {sonde}", f"Température: {fmt_deg(temp)} (seuil {fmt_deg(seuil)})", f"Site: {site}", f"Heure: {when_dt.strftime('%Y-%m-%d %H:%M:%S')}", ] txt = "\n".join(lines) return subject, txt, txt def build_ok_text( site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None, ) -> tuple[str, str, str]: when_dt = when if when is not None else now_paris() subject = f"[OK {site}] {sonde} revenue normale" lines = [ subject + ":", f"Sonde: {sonde}", f"Température: {fmt_deg(temp)} <= seuil {fmt_deg(seuil)}", f"Site: {site}", f"Heure: {when_dt.strftime('%Y-%m-%d %H:%M:%S')}", ] txt = "\n".join(lines) return subject, txt, txt def build_gyro_chat_alert( site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None, ) -> str: when_dt = when if when is not None else now_paris() return ( f":rotating_light: [{site}] GYRO DECLENCHE\n" f"Sonde: {sonde}\n" f"Température: {fmt_deg(temp)} > seuil {fmt_deg(seuil)}\n" f"Heure: {when_dt.strftime('%Y-%m-%d %H:%M:%S')}" ) def build_gyro_chat_ok( site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None, ) -> str: when_dt = when if when is not None else now_paris() return ( f":white_check_mark: [{site}] GYRO RETOUR NORMALE\n" f"Sonde: {sonde}\n" f"Température: {fmt_deg(temp)} <= seuil {fmt_deg(seuil)}\n" f"Heure: {when_dt.strftime('%Y-%m-%d %H:%M:%S')}" ) # ========= Gyrophare MQTT ========= class MQTTPublisher: def __init__(self, site: str): self.enabled = bool(_mqtt_ok) self.site = site self.topic = ( _env_str(f"GYRO_MQTT_TOPIC_{site}") or _env_str(f"GYRO_MQTT_TOPIC_{site.upper()}") or _env_str("GYRO_MQTT_TOPIC") or f"Sondes/{site}/Gyro/cmd" ) self.last_state: bool | None = None self.client: Any | None = None if not self.enabled: log.info("Gyro MQTT désactivé (paho-mqtt absent).") return if not self.topic: log.warning("Topic MQTT manquant pour %s.", site) self.enabled = False return host = _env_str("MQTT_HOST", "localhost") port = int(_env_str("MQTT_PORT", "1883")) user = _env_str("MQTT_USER") pwd = _env_str("MQTT_PASS") tls = _env_bool("MQTT_TLS", False) try: self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) # type: ignore[union-attr] except Exception: try: self.client = mqtt.Client() # type: ignore[union-attr] except TypeError: self.client = mqtt.Client(client_id="") # type: ignore[union-attr] if user and pwd: self.client.username_pw_set(user, pwd) if tls: self.client.tls_set() try: self.client.on_message = self._on_message self.client.connect(host, port, keepalive=30) subs_env = ( _env_str(f"GYRO_MQTT_SUB_{site}") or _env_str(f"GYRO_MQTT_SUB_{site.upper()}") or _env_str("GYRO_MQTT_SUB") ) subs = [topic.strip() for topic in subs_env.split(",") if topic.strip()] if not subs: subs = [ self.topic, f"Sondes/{site}/Gyro/#", f"{site}/Gyro/#", "Gyro/#", ] for topic in subs: try: self.client.subscribe(topic, qos=2) log.info("MQTT subscribe: %s", topic) except Exception as err: log.warning("Subscribe échoué (%s): %s", topic, err) self.client.loop_start() log.info("MQTT connecté (%s:%s), topic=%s", host, port, self.topic) except Exception as err: log.exception("MQTT connexion impossible: %s", err) self.enabled = False def _on_message(self, _client, _userdata, msg) -> None: lieu = self.site topic = str(msg.topic) payload_raw = msg.payload.decode(errors="ignore").strip() upper = payload_raw.upper() if upper in ("ON", "OFF") or "gyro" in topic.lower() or "gyrophare" in topic.lower(): etat = upper if upper in ("ON", "OFF") else ("ON" if "ON" in upper else "OFF") try: if should_insert_gyro(lieu, etat): insert_gyro_log( lieu=lieu, etat=etat, topic=topic, payload_raw=payload_raw, qos=getattr(msg, "qos", None), retained=getattr(msg, "retain", None), when=now_paris(), ) except Exception as err: log.exception("Insert Gyro échoué: %s", err) return try: float(payload_raw.replace(",", ".")) except ValueError: log.debug("Payload non géré (ni gyro ni nombre): %s %s", topic, payload_raw) def set(self, on: bool) -> None: if not self.enabled or self.client is None: return if self.last_state is not None and self.last_state == on: return payload = "ON" if on else "OFF" try: result = self.client.publish(self.topic, payload=payload, qos=2, retain=True) try: result.wait_for_publish(timeout=3) except Exception: pass if getattr(result, "rc", 0) != 0: log.warning("MQTT publish rc=%s (topic=%s)", getattr(result, "rc", None), self.topic) else: log.info("Gyro %s -> %s (MQTT)", self.site, payload) try: insert_gyro_log( lieu=self.site, etat=payload, topic=self.topic, payload_raw=payload, qos=2, retained=1 if getattr(result, "is_published", lambda: False)() else None, when=now_paris(), ) except Exception as err: log.exception("Insert événement gyro en base a échoué: %s", err) self.last_state = on except Exception as err: log.exception("MQTT publish erreur: %s", err) # ========= Contrôleur Gyro réactif ========= class _GyroState(enum.Enum): IDLE = 0 PULSE_ON = 1 COOLDOWN = 2 class GyroPulseController: """ Boucle rapide indépendante : - MODE CONTINU : ON tant que l’alarme persiste, OFF quand retour normal confirmé. - MODE PULSE : ON puis OFF pendant cooldown tant que l’alarme persiste. Notifications conservées : - Synology Chat immédiat au déclenchement Gyro - Synology Chat immédiat au retour à la normale """ def __init__( self, site: str, beacon: MQTTPublisher, *, check_sec: int = int(_env_str("GYRO_CHECK_SEC", "20")), pulse_sec: int = int(_env_str("GYRO_PULSE_SEC", "60")), cooldown_sec: int = int(_env_str("GYRO_COOLDOWN_SEC", "600")), normal_confirm: int = int(_env_str("GYRO_NORMAL_CONFIRM", "2")), ): self.site = site self.beacon = beacon self.check_sec = check_sec self.pulse_sec = pulse_sec self.cooldown_sec = cooldown_sec self.normal_confirm = normal_confirm self.state = _GyroState.IDLE self._t_pulse_end = 0.0 self._t_cooldown_end = 0.0 self._normal_count = 0 self._stop = threading.Event() self._thread: threading.Thread | None = None self._current: bool | None = None self._last_trigger: tuple[str, float, float] | None = None def _set_gyro(self, on: bool) -> None: if self._current is not on: self.beacon.set(on) self._current = on def start(self) -> None: if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() log.info( "GyroPulseController démarré (site=%s, check=%ss, pulse=%ss, cooldown=%ss, confirm=%d)", self.site, self.check_sec, self.pulse_sec, self.cooldown_sec, self.normal_confirm, ) def stop(self) -> None: self._stop.set() def _send_chat_alert(self, trigger: tuple[str, float, float] | None) -> None: if not trigger or not _env_bool("SYNO_CHAT_GYRO_ENABLED", True): return sonde, temp, seuil = trigger chat_msg = build_gyro_chat_alert(self.site, sonde, temp, seuil, when=now_paris()) send_synology_chat(chat_msg) def _send_chat_ok_from_last_trigger(self) -> None: if not self._last_trigger or not _env_bool("SYNO_CHAT_GYRO_ENABLED", True): return sonde, _temp_prev, seuil = self._last_trigger rows = lire_sondes_depuis_db(self.site) curr_temp: float | None = None for row in rows: if str(row["Sonde"]) == sonde: curr_temp = float(row["Temperature"]) break if curr_temp is None: curr_temp = seuil - 0.1 chat_msg = build_gyro_chat_ok(self.site, sonde, curr_temp, seuil, when=now_paris()) send_synology_chat(chat_msg) self._last_trigger = None def _is_alarm_now(self) -> tuple[bool, tuple[str, float, float] | None]: last_rows = lire_sondes_depuis_db(self.site) cfg = lire_cfg_chambres(self.site) return compute_site_alarm( last_rows, cfg, hysteresis=float(_env_str("GYRO_HYSTERESIS", "0.0")), ) def _run(self) -> None: while not self._stop.is_set(): now = time.time() try: active, trigger = self._is_alarm_now() except Exception as err: log.exception("Gyro fast-loop: erreur lecture état: %s", err) active, trigger = False, None if self.state == _GyroState.IDLE: if active: self._set_gyro(True) self._t_pulse_end = now + self.pulse_sec self._normal_count = 0 self.state = _GyroState.PULSE_ON self._last_trigger = trigger if trigger: sonde, temp, seuil = trigger mode = "CONTINU" if _env_bool("GYRO_MODE_CONTINUOUS", True) else "PULSE" log.info("Gyro → ON déclenché par %s: %.2f > %.2f (mode %s)", sonde, temp, seuil, mode) self._send_chat_alert(trigger) elif self.state == _GyroState.PULSE_ON: if not active: self._normal_count += 1 if self._normal_count >= self.normal_confirm: self._set_gyro(False) self.state = _GyroState.IDLE self._normal_count = 0 log.info("Gyro → OFF (retour à la normale confirmé)") self._send_chat_ok_from_last_trigger() else: self._normal_count = 0 if not _env_bool("GYRO_MODE_CONTINUOUS", True) and now >= self._t_pulse_end: self._set_gyro(False) self._t_cooldown_end = now + self.cooldown_sec self.state = _GyroState.COOLDOWN log.info("Gyro → OFF, cooldown %ss (alerte persiste)", self.cooldown_sec) elif self.state == _GyroState.COOLDOWN: if not active: self._normal_count += 1 if self._normal_count >= self.normal_confirm: self.state = _GyroState.IDLE self._normal_count = 0 log.info("Gyro: retour IDLE (plus d’alerte)") else: self._normal_count = 0 if now >= self._t_cooldown_end: self._set_gyro(True) self._t_pulse_end = now + self.pulse_sec self.state = _GyroState.PULSE_ON log.info("Gyro → ON (re-pulse)") time.sleep(self.check_sec) # ========= Notifs haut-niveau ========= notifier = Notifier() beacon = MQTTPublisher(SITE) def notifier_sur_depassement(site: str, sonde: str, temp: float, seuil: float) -> None: """ Mail quand l'alerte est confirmée (≥ 30 min) et ouverte en base. """ subject, _mail_text, email_body = build_alert_text(site, sonde, temp, seuil) notifier.send_email(subject, email_body) def notifier_acquittement(site: str, sonde: str, temp: float, seuil: float) -> None: """ Mail lorsque l’alerte est acquittée en base. """ subject, _mail_text, email_body = build_ok_text(site, sonde, temp, seuil) notifier.send_email(subject, email_body) # ========= Cycle & boucle ========= def run_monitor_cycle(site: str = SITE) -> None: last_rows = lire_sondes_depuis_db(site) cfg = lire_cfg_chambres(site) try: _gyro_on, trigger = compute_site_alarm( last_rows, cfg, hysteresis=float(_env_str("GYRO_HYSTERESIS", "0.0")), ) if trigger: sonde, temp, seuil = trigger log.info("Dépassement détecté (gyro géré par boucle rapide) : %s %.2f > %.2f", sonde, temp, seuil) else: log.info("Aucun dépassement au moment du cycle") except Exception as err: log.exception("Erreur calcul alarme (info): %s", err) seuils = {sonde: float(meta["temp_max"]) for sonde, meta in cfg.items() if meta.get("active", False)} for row in last_rows: nom = str(row["Sonde"]) temp = float(row["Temperature"]) if nom not in seuils: continue seuil = seuils[nom] now_ = now_paris() if temp > seuil: if depassement_depuis_30min(site, nom, seuil): conn = None try: conn = get_db() if open_alert(conn, f"Alertes_{site}", nom, now_): notifier_sur_depassement(site, nom, temp, seuil) finally: if conn: conn.close() else: conn = None try: conn = get_db() if close_alert(conn, f"Alertes_{site}", nom): notifier_acquittement(site, nom, temp, seuil) finally: if conn: conn.close() def run_monitor_loop(site: str = SITE, period_sec: int = 300) -> None: log.info("%s démarré (site=%s, période=%ss) ✅", PROGRAM_NAME, site, period_sec) try: global _gyro_controller _gyro_controller = GyroPulseController(site, beacon) _gyro_controller.start() except Exception as err: log.exception("Impossible de démarrer le GyroPulseController: %s", err) while True: t0 = time.time() try: run_monitor_cycle(site) except Exception as err: log.exception("Erreur cycle monitoring: %s", err) time.sleep(max(0.0, period_sec - (time.time() - t0))) # ========= CLI ========= if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description=PROGRAM_NAME) parser.add_argument("--period", type=int, default=300) parser.add_argument("--test-mail", action="store_true") parser.add_argument("--test-alert", action="store_true") parser.add_argument("--test-ok", action="store_true") parser.add_argument("--test-chat", action="store_true") parser.add_argument("--once", action="store_true") args = parser.parse_args() if args.test_mail: notifier.send_email(f"[TEST {SITE}] Mail", "OK") elif args.test_alert: notifier_sur_depassement(SITE, "Congelateur", -14.5, -15.0) elif args.test_ok: notifier_acquittement(SITE, "Congelateur", -15.2, -15.0) elif args.test_chat: send_synology_chat(f":speech_balloon: [TEST {SITE}] Notification Synology Chat OK") else: if args.once: run_monitor_cycle(SITE) else: run_monitor_loop(SITE, period_sec=args.period)