#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import logging import os import re import threading import time from typing import Any import mysql.connector import paho.mqtt.client as mqtt import requests from dotenv import find_dotenv, load_dotenv load_dotenv(find_dotenv(usecwd=True), override=False) _ALLOWED_SITE_RE = re.compile(r"^[A-Za-z0-9_]+$") def safe_site(site: str) -> str: site = (site or "").strip() if not site or not _ALLOWED_SITE_RE.fullmatch(site): raise ValueError(f"Nom de site invalide: {site!r}") return site def _env_str(name: str, default: str = "") -> str: return (os.getenv(name, default) or "").strip() def _env_bool(name: str, default: bool = False) -> bool: value = _env_str(name, "1" if default else "0").lower() return value in ("1", "true", "yes", "on") logging.basicConfig( level=getattr(logging, _env_str("LOGLEVEL", "INFO").upper(), logging.INFO), format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) log = logging.getLogger("gyro") DEF_CHECK_SEC = int(_env_str("GYRO_CHECK_SEC", "20")) DEF_NORMAL_CONFIRM = int(_env_str("GYRO_NORMAL_CONFIRM", "6")) def send_synology_chat(message: str, *, site: str, username: str | None = None) -> bool: webhook = ( _env_str(f"SYNO_CHAT_WEBHOOK_{site}") or _env_str(f"SYNO_CHAT_WEBHOOK_{site.upper()}") or _env_str("SYNO_CHAT_WEBHOOK") ) if not webhook: log.info("[%s] Synology Chat non configuré.", site) return False botname = username or _env_str("SYNO_CHAT_BOTNAME", "Gestion Gyro") timeout = int(_env_str("SYNO_CHAT_TIMEOUT", "10")) verify_ssl = _env_bool("SYNO_CHAT_VERIFY_SSL", True) chat_payload: dict[str, str] = {"text": message} if botname: chat_payload["username"] = botname form_data = {"payload": json.dumps(chat_payload, ensure_ascii=False)} try: response = requests.post( webhook, data=form_data, timeout=timeout, verify=verify_ssl, ) response.raise_for_status() txt = (response.text or "").strip() log.info("[%s] Réponse Synology Chat: %s", site, txt[:300] if txt else "") try: data = response.json() if isinstance(data, dict): success = bool(data.get("success", False)) if not success: log.warning("[%s] Synology Chat a répondu sans succès: %s", site, data) return success except ValueError: pass return txt.lower() == "ok" or not txt except requests.RequestException as exc: log.exception("[%s] Echec envoi Synology Chat: %s", site, exc) return False class MqttGyroDriver: def __init__(self, host: str, port: int, user: str, password: str, topic_cmd: str): self.topic_cmd = topic_cmd try: self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) except Exception: self.client = mqtt.Client() if user: self.client.username_pw_set(user, password or "") self.client.connect(host, int(port), keepalive=30) self.client.loop_start() log.info("MQTT connecté (%s:%s), topic=%s", host, port, topic_cmd) def set(self, on: bool) -> None: payload = "ON" if on else "OFF" result = self.client.publish(self.topic_cmd, payload=payload, qos=1, retain=False) result.wait_for_publish(timeout=5) log.info("MQTT → %s : %s", self.topic_cmd, payload) def close(self) -> None: try: self.client.loop_stop() self.client.disconnect() except Exception: pass class GyroController: """ Gyro ON en continu tant qu'il existe au moins une alerte Etat='En cours'. Gyro OFF après 'normal_confirm' lectures consécutives sans alerte. Notification Chat sur transition ON/OFF. """ def __init__( self, *, site_name: str, db_cfg: dict[str, Any], alertes_table: str, mqtt_driver: MqttGyroDriver, check_sec: int = DEF_CHECK_SEC, normal_confirm: int = DEF_NORMAL_CONFIRM, ): self.site = site_name self.db_cfg = db_cfg self.alertes_table = alertes_table self.mqtt = mqtt_driver self.check_sec = check_sec self.normal_confirm = normal_confirm self._stop = threading.Event() self._thread: threading.Thread | None = None self._current_on: bool | None = None self._normal_count = 0 def _send_chat_on(self) -> None: if not _env_bool("SYNO_CHAT_GYRO_ENABLED", True): return message = ( f":rotating_light: [{self.site}] GYRO DECLENCHE\n" f"Table alertes: {self.alertes_table}\n" "Etat: au moins une alerte en cours" ) send_synology_chat(message, site=self.site) def _send_chat_off(self) -> None: if not _env_bool("SYNO_CHAT_GYRO_ENABLED", True): return message = ( f":white_check_mark: [{self.site}] GYRO RETOUR NORMALE\n" f"Table alertes: {self.alertes_table}\n" "Etat: plus d'alerte en cours" ) send_synology_chat(message, site=self.site) def _set_gyro(self, on: bool) -> None: if self._current_on is on: return previous = self._current_on self.mqtt.set(on) self._current_on = on if previous is None: log.info("[%s] Etat initial Gyro: %s", self.site, "ON" if on else "OFF") return if on: log.info("[%s] Transition Gyro OFF → ON", self.site) self._send_chat_on() else: log.info("[%s] Transition Gyro ON → OFF", self.site) self._send_chat_off() def _has_active_alert(self, cur) -> bool: cur.execute(f"SELECT COUNT(*) FROM `{self.alertes_table}` WHERE Etat='En cours'") row = cur.fetchone() return bool(row and row[0] > 0) def start(self) -> None: if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() log.info( "[%s] GyroController démarré (check=%ss, confirm=%d)", self.site, self.check_sec, self.normal_confirm, ) def stop(self) -> None: self._stop.set() def _connect_mysql(self): while not self._stop.is_set(): try: cnx = mysql.connector.connect(autocommit=True, **self.db_cfg) cur = cnx.cursor() return cnx, cur except Exception as exc: log.error("[%s] Connexion MySQL KO (%s). Retry 5s...", self.site, exc) time.sleep(5) return None, None def _run(self) -> None: cnx, cur = self._connect_mysql() if not cnx or not cur: return try: try: self._set_gyro(False) except Exception: pass while not self._stop.is_set(): try: active = self._has_active_alert(cur) except Exception as exc: log.error("[%s] Lecture alertes KO: %s -> reconnexion MySQL", self.site, exc) try: cur.close() cnx.close() except Exception: pass cnx, cur = self._connect_mysql() if not cnx or not cur: break active = False if active: self._normal_count = 0 self._set_gyro(True) else: self._normal_count += 1 if self._normal_count >= self.normal_confirm: self._set_gyro(False) time.sleep(self.check_sec) finally: try: self._set_gyro(False) except Exception: pass try: cur.close() cnx.close() except Exception: pass log.info("[%s] GyroController stoppé", self.site) def build_db_cfg() -> dict[str, Any]: return { "host": _env_str("DB_HOST", "162.19.78.131"), "user": _env_str("DB_USER", "sondes"), "password": _env_str("DB_PASS"), "database": _env_str("DB_NAME", "Sondes"), "port": int(_env_str("DB_PORT", "3306")), } def build_topic(site: str) -> str: return ( _env_str(f"GYRO_MQTT_TOPIC_{site}") or _env_str(f"GYRO_MQTT_TOPIC_{site.upper()}") or _env_str("GYRO_MQTT_TOPIC") or f"{site}/gyrophare" ) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Contrôle du gyrophare via table d'alertes") parser.add_argument("--site", default=_env_str("SITE", "Saclay")) parser.add_argument("--test-chat", action="store_true") args = parser.parse_args() site = safe_site(args.site) if args.test_chat: send_synology_chat(f":speech_balloon: [TEST {site}] Notification Synology Chat OK", site=site) raise SystemExit(0) alertes_table = _env_str("ALERTES_TABLE", f"Alertes_{site}") db_cfg = build_db_cfg() mqtt_host = _env_str("MQTT_HOST", "162.19.78.131") mqtt_port = int(_env_str("MQTT_PORT", "1883")) mqtt_user = _env_str("MQTT_USER", "sondes") mqtt_pass = _env_str("MQTT_PASS") topic_cmd = build_topic(site) log.info("[%s] MQTT host=%s port=%s topic=%s", site, mqtt_host, mqtt_port, topic_cmd) driver = MqttGyroDriver(mqtt_host, mqtt_port, mqtt_user, mqtt_pass, topic_cmd) controller = GyroController( site_name=site, db_cfg=db_cfg, alertes_table=alertes_table, mqtt_driver=driver, check_sec=DEF_CHECK_SEC, normal_confirm=DEF_NORMAL_CONFIRM, ) controller.start() try: while True: time.sleep(1) except KeyboardInterrupt: pass finally: controller.stop() driver.close()