# gyro_control.py import os, time, enum, logging, threading import mysql.connector # pip install mysql-connector-python import paho.mqtt.client as mqtt # pip install paho-mqtt log = logging.getLogger("gyro") # Paramètres par défaut (surclassables via env ou arguments) DEF_CHECK_SEC = int(os.getenv("GYRO_CHECK_SEC", "20")) DEF_PULSE_SEC = int(os.getenv("GYRO_PULSE_SEC", "60")) DEF_COOLDOWN_SEC = int(os.getenv("GYRO_COOLDOWN_SEC", "600")) DEF_NORMAL_CONFIRM = int(os.getenv("GYRO_NORMAL_CONFIRM", "2")) class GyroState(enum.Enum): IDLE = 0 PULSE_ON = 1 COOLDOWN = 2 class MqttGyroDriver: def __init__(self, host, port, user, password, topic_command): self.topic_command = topic_command self.client = mqtt.Client() if user: self.client.username_pw_set(user, password or "") self.client.connect(host, int(port or 1883), keepalive=30) self.client.loop_start() def set(self, on: bool): payload = "ON" if on else "OFF" res = self.client.publish(self.topic_command, payload=payload, qos=1, retain=False) res.wait_for_publish(timeout=5) log.info("MQTT → %s : %s", self.topic_command, payload) def close(self): try: self.client.loop_stop(); self.client.disconnect() except Exception: pass class GyroController: """ Boucle indépendante et légère : lit l'état d'alerte en SQL et pulse le gyro via MQTT. """ def __init__( self, *, site_name: str, db_cfg: dict, alertes_table: str, mqtt_driver: MqttGyroDriver, check_sec: int = DEF_CHECK_SEC, pulse_sec: int = DEF_PULSE_SEC, cooldown_sec: int = DEF_COOLDOWN_SEC, normal_confirm: int = DEF_NORMAL_CONFIRM, ): self.site = site_name self.db_cfg = db_cfg self.alertes_table = alertes_table self.mqtt = mqtt_driver self.check_sec = check_sec self.pulse_sec = pulse_sec self.cooldown_sec = cooldown_sec self.normal_confirm = normal_confirm self.state = GyroState.IDLE self._t_pulse_end = 0.0 self._t_cooldown_end = 0.0 self._normal_count = 0 self._stop = threading.Event() self._current_gyro_on = None self._thread = None # --- helpers --- def _set_gyro(self, on: bool): if self._current_gyro_on is not on: self.mqtt.set(on) self._current_gyro_on = on def _has_active_alert(self, cur) -> bool: cur.execute(f"SELECT COUNT(*) FROM `{self.alertes_table}` WHERE Etat='En cours'") return cur.fetchone()[0] > 0 # --- lifecycle --- def start(self): if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() log.info("[%s] GyroController démarré (check=%ss, pulse=%ss, cooldown=%ss, confirm=%d)", self.site, self.check_sec, self.pulse_sec, self.cooldown_sec, self.normal_confirm) def stop(self): self._stop.set() # --- main loop --- def _run(self): # Ouverture connexion MySQL persistante while not self._stop.is_set(): try: cnx = mysql.connector.connect(autocommit=True, **self.db_cfg) cur = cnx.cursor() break except Exception as e: log.error("[%s] Connexion MySQL KO (%s). Retry 5s…", self.site, e) time.sleep(5) try: while not self._stop.is_set(): now = time.time() try: active = self._has_active_alert(cur) except Exception as e: log.error("[%s] Lecture alertes KO: %s", self.site, e) active = False # prudence if self.state == GyroState.IDLE: if active: self._set_gyro(True) self._t_pulse_end = now + self.pulse_sec self.state = GyroState.PULSE_ON self._normal_count = 0 log.info("[%s] Gyro ON (pulse %ss)", self.site, self.pulse_sec) elif self.state == GyroState.PULSE_ON: if not active: self._normal_count += 1 if self._normal_count >= self.normal_confirm: self._set_gyro(False) self.state = GyroState.IDLE self._normal_count = 0 log.info("[%s] Gyro OFF (retour à la normale)", self.site) else: self._normal_count = 0 if now >= self._t_pulse_end: self._set_gyro(False) self._t_cooldown_end = now + self.cooldown_sec self.state = GyroState.COOLDOWN log.info("[%s] Gyro OFF → cooldown %ss", self.site, self.cooldown_sec) elif self.state == GyroState.COOLDOWN: if not active: self._normal_count += 1 if self._normal_count >= self.normal_confirm: self.state = GyroState.IDLE self._normal_count = 0 log.info("[%s] Retour IDLE", self.site) else: self._normal_count = 0 if now >= self._t_cooldown_end: self._set_gyro(True) self._t_pulse_end = now + self.pulse_sec self.state = GyroState.PULSE_ON log.info("[%s] Gyro ON (re-pulse)", self.site) time.sleep(self.check_sec) finally: try: self._set_gyro(False) except Exception: pass try: cur.close(); cnx.close() except Exception: pass log.info("[%s] GyroController stoppé", self.site)