Files
Gestion_sondes/app/gyro_control.py
2025-09-22 15:02:35 +02:00

165 lines
6.2 KiB
Python

# gyro_control.py
import os, time, enum, logging, threading
import mysql.connector # pip install mysql-connector-python
import paho.mqtt.client as mqtt # pip install paho-mqtt
log = logging.getLogger("gyro")
# Paramètres par défaut (surclassables via env ou arguments)
DEF_CHECK_SEC = int(os.getenv("GYRO_CHECK_SEC", "20"))
DEF_PULSE_SEC = int(os.getenv("GYRO_PULSE_SEC", "60"))
DEF_COOLDOWN_SEC = int(os.getenv("GYRO_COOLDOWN_SEC", "600"))
DEF_NORMAL_CONFIRM = int(os.getenv("GYRO_NORMAL_CONFIRM", "2"))
class GyroState(enum.Enum):
IDLE = 0
PULSE_ON = 1
COOLDOWN = 2
class MqttGyroDriver:
def __init__(self, host, port, user, password, topic_command):
self.topic_command = topic_command
self.client = mqtt.Client()
if user:
self.client.username_pw_set(user, password or "")
self.client.connect(host, int(port or 1883), keepalive=30)
self.client.loop_start()
def set(self, on: bool):
payload = "ON" if on else "OFF"
res = self.client.publish(self.topic_command, payload=payload, qos=1, retain=False)
res.wait_for_publish(timeout=5)
log.info("MQTT → %s : %s", self.topic_command, payload)
def close(self):
try:
self.client.loop_stop(); self.client.disconnect()
except Exception:
pass
class GyroController:
"""
Boucle indépendante et légère : lit l'état d'alerte en SQL et pulse le gyro via MQTT.
"""
def __init__(
self,
*,
site_name: str,
db_cfg: dict,
alertes_table: str,
mqtt_driver: MqttGyroDriver,
check_sec: int = DEF_CHECK_SEC,
pulse_sec: int = DEF_PULSE_SEC,
cooldown_sec: int = DEF_COOLDOWN_SEC,
normal_confirm: int = DEF_NORMAL_CONFIRM,
):
self.site = site_name
self.db_cfg = db_cfg
self.alertes_table = alertes_table
self.mqtt = mqtt_driver
self.check_sec = check_sec
self.pulse_sec = pulse_sec
self.cooldown_sec = cooldown_sec
self.normal_confirm = normal_confirm
self.state = GyroState.IDLE
self._t_pulse_end = 0.0
self._t_cooldown_end = 0.0
self._normal_count = 0
self._stop = threading.Event()
self._current_gyro_on = None
self._thread = None
# --- helpers ---
def _set_gyro(self, on: bool):
if self._current_gyro_on is not on:
self.mqtt.set(on)
self._current_gyro_on = on
def _has_active_alert(self, cur) -> bool:
cur.execute(f"SELECT COUNT(*) FROM `{self.alertes_table}` WHERE Etat='En cours'")
return cur.fetchone()[0] > 0
# --- lifecycle ---
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop.clear()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
log.info("[%s] GyroController démarré (check=%ss, pulse=%ss, cooldown=%ss, confirm=%d)",
self.site, self.check_sec, self.pulse_sec, self.cooldown_sec, self.normal_confirm)
def stop(self):
self._stop.set()
# --- main loop ---
def _run(self):
# Ouverture connexion MySQL persistante
while not self._stop.is_set():
try:
cnx = mysql.connector.connect(autocommit=True, **self.db_cfg)
cur = cnx.cursor()
break
except Exception as e:
log.error("[%s] Connexion MySQL KO (%s). Retry 5s…", self.site, e)
time.sleep(5)
try:
while not self._stop.is_set():
now = time.time()
try:
active = self._has_active_alert(cur)
except Exception as e:
log.error("[%s] Lecture alertes KO: %s", self.site, e)
active = False # prudence
if self.state == GyroState.IDLE:
if active:
self._set_gyro(True)
self._t_pulse_end = now + self.pulse_sec
self.state = GyroState.PULSE_ON
self._normal_count = 0
log.info("[%s] Gyro ON (pulse %ss)", self.site, self.pulse_sec)
elif self.state == GyroState.PULSE_ON:
if not active:
self._normal_count += 1
if self._normal_count >= self.normal_confirm:
self._set_gyro(False)
self.state = GyroState.IDLE
self._normal_count = 0
log.info("[%s] Gyro OFF (retour à la normale)", self.site)
else:
self._normal_count = 0
if now >= self._t_pulse_end:
self._set_gyro(False)
self._t_cooldown_end = now + self.cooldown_sec
self.state = GyroState.COOLDOWN
log.info("[%s] Gyro OFF → cooldown %ss", self.site, self.cooldown_sec)
elif self.state == GyroState.COOLDOWN:
if not active:
self._normal_count += 1
if self._normal_count >= self.normal_confirm:
self.state = GyroState.IDLE
self._normal_count = 0
log.info("[%s] Retour IDLE", self.site)
else:
self._normal_count = 0
if now >= self._t_cooldown_end:
self._set_gyro(True)
self._t_pulse_end = now + self.pulse_sec
self.state = GyroState.PULSE_ON
log.info("[%s] Gyro ON (re-pulse)", self.site)
time.sleep(self.check_sec)
finally:
try:
self._set_gyro(False)
except Exception:
pass
try:
cur.close(); cnx.close()
except Exception:
pass
log.info("[%s] GyroController stoppé", self.site)