# gyro_control.py import os, time, logging, threading import mysql.connector import paho.mqtt.client as mqtt log = logging.getLogger("gyro") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") DEF_CHECK_SEC = int(os.getenv("GYRO_CHECK_SEC", "20")) DEF_NORMAL_CONFIRM = int(os.getenv("GYRO_NORMAL_CONFIRM", "2")) class MqttGyroDriver: def __init__(self, host, port, user, password, topic_cmd): self.topic_cmd = topic_cmd self.client = mqtt.Client() if user: self.client.username_pw_set(user, password or "") self.client.connect(host, int(port or 1883), keepalive=30) self.client.loop_start() def set(self, on: bool): payload = "ON" if on else "OFF" res = self.client.publish(self.topic_cmd, payload=payload, qos=1, retain=False) res.wait_for_publish(timeout=5) log.info("MQTT → %s : %s", self.topic_cmd, payload) def close(self): try: self.client.loop_stop() self.client.disconnect() except Exception: pass class GyroController: """ Gyro ON en continu tant qu'il existe au moins une alerte Etat='En cours'. Gyro OFF après 'normal_confirm' lectures consécutives sans alerte. """ def __init__(self, *, site_name: str, db_cfg: dict, alertes_table: str, mqtt_driver: MqttGyroDriver, check_sec: int = DEF_CHECK_SEC, normal_confirm: int = DEF_NORMAL_CONFIRM): self.site = site_name self.db_cfg = db_cfg self.alertes_table = alertes_table self.mqtt = mqtt_driver self.check_sec = check_sec self.normal_confirm = normal_confirm self._stop = threading.Event() self._thread = None self._current_on = None self._normal_count = 0 def _set_gyro(self, on: bool): if self._current_on is not on: self.mqtt.set(on) self._current_on = on def _has_active_alert(self, cur) -> bool: cur.execute(f"SELECT COUNT(*) FROM `{self.alertes_table}` WHERE Etat='En cours'") return cur.fetchone()[0] > 0 def start(self): if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() log.info("[%s] GyroController démarré (check=%ss, confirm=%d)", self.site, self.check_sec, self.normal_confirm) def stop(self): self._stop.set() def _connect_mysql(self): while not self._stop.is_set(): try: cnx = mysql.connector.connect(autocommit=True, **self.db_cfg) cur = cnx.cursor() return cnx, cur except Exception as e: log.error("[%s] Connexion MySQL KO (%s). Retry 5s…", self.site, e) time.sleep(5) return None, None def _run(self): cnx, cur = self._connect_mysql() if not cnx: return try: # au démarrage, on force OFF par sécurité (optionnel) try: self._set_gyro(False) except Exception: pass while not self._stop.is_set(): try: active = self._has_active_alert(cur) except Exception as e: log.error("[%s] Lecture alertes KO: %s -> reconnexion MySQL", self.site, e) try: cur.close(); cnx.close() except Exception: pass cnx, cur = self._connect_mysql() if not cnx: break active = False if active: self._normal_count = 0 self._set_gyro(True) else: self._normal_count += 1 if self._normal_count >= self.normal_confirm: self._set_gyro(False) time.sleep(self.check_sec) finally: try: self._set_gyro(False) except Exception: pass try: cur.close(); cnx.close() except Exception: pass log.info("[%s] GyroController stoppé", self.site) if __name__ == "__main__": # ---- CONFIG À ADAPTER ---- SITE = "Meudon" ALERTES_TABLE = "Alertes_Meudon" # adaptez au nom réel DB_CFG = dict( host=(os.getenv("DB_HOST") or "162.19.78.131").strip(), user=(os.getenv("DB_USER") or "sondes").strip(), password=os.getenv("DB_PASSWORD") or "TX.)-U1!zq5Axdk4", database=(os.getenv("DB_NAME") or "Sondes").strip(), port=int(os.getenv("DB_PORT") or 3306), ) MQTT_HOST = (os.getenv("MQTT_HOST") or "162.19.78.131").strip() MQTT_PORT = int(os.getenv("MQTT_PORT") or 1883) MQTT_USER = os.getenv("MQTT_USER") or "sondes" MQTT_PASS = os.getenv("MQTT_PASSWORD") or "3J@bjYP0" TOPIC_CMD = "Meudon/gyrophare/cmd" print("MQTT_HOST =", repr(MQTT_HOST)) print("MQTT_PORT =", repr(MQTT_PORT)) drv = MqttGyroDriver(MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS, TOPIC_CMD) ctl = GyroController(site_name=SITE, db_cfg=DB_CFG, alertes_table=ALERTES_TABLE, mqtt_driver=drv, check_sec=DEF_CHECK_SEC, normal_confirm=DEF_NORMAL_CONFIRM) ctl.start() try: while True: time.sleep(1) except KeyboardInterrupt: pass finally: ctl.stop() drv.close()