Files
Gestion_sondes/app/gyro_control.py

169 lines
5.6 KiB
Python

# gyro_control.py
import os, time, logging, threading
import mysql.connector
import paho.mqtt.client as mqtt
log = logging.getLogger("gyro")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
DEF_CHECK_SEC = int(os.getenv("GYRO_CHECK_SEC", "20"))
DEF_NORMAL_CONFIRM = int(os.getenv("GYRO_NORMAL_CONFIRM", "2"))
class MqttGyroDriver:
def __init__(self, host, port, user, password, topic_cmd):
self.topic_cmd = topic_cmd
self.client = mqtt.Client()
if user:
self.client.username_pw_set(user, password or "")
self.client.connect(host, int(port or 1883), keepalive=30)
self.client.loop_start()
def set(self, on: bool):
payload = "ON" if on else "OFF"
res = self.client.publish(self.topic_cmd, payload=payload, qos=1, retain=False)
res.wait_for_publish(timeout=5)
log.info("MQTT → %s : %s", self.topic_cmd, payload)
def close(self):
try:
self.client.loop_stop()
self.client.disconnect()
except Exception:
pass
class GyroController:
"""
Gyro ON en continu tant qu'il existe au moins une alerte Etat='En cours'.
Gyro OFF après 'normal_confirm' lectures consécutives sans alerte.
"""
def __init__(self, *, site_name: str, db_cfg: dict, alertes_table: str,
mqtt_driver: MqttGyroDriver, check_sec: int = DEF_CHECK_SEC,
normal_confirm: int = DEF_NORMAL_CONFIRM):
self.site = site_name
self.db_cfg = db_cfg
self.alertes_table = alertes_table
self.mqtt = mqtt_driver
self.check_sec = check_sec
self.normal_confirm = normal_confirm
self._stop = threading.Event()
self._thread = None
self._current_on = None
self._normal_count = 0
def _set_gyro(self, on: bool):
if self._current_on is not on:
self.mqtt.set(on)
self._current_on = on
def _has_active_alert(self, cur) -> bool:
cur.execute(f"SELECT COUNT(*) FROM `{self.alertes_table}` WHERE Etat='En cours'")
return cur.fetchone()[0] > 0
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop.clear()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
log.info("[%s] GyroController démarré (check=%ss, confirm=%d)",
self.site, self.check_sec, self.normal_confirm)
def stop(self):
self._stop.set()
def _connect_mysql(self):
while not self._stop.is_set():
try:
cnx = mysql.connector.connect(autocommit=True, **self.db_cfg)
cur = cnx.cursor()
return cnx, cur
except Exception as e:
log.error("[%s] Connexion MySQL KO (%s). Retry 5s…", self.site, e)
time.sleep(5)
return None, None
def _run(self):
cnx, cur = self._connect_mysql()
if not cnx:
return
try:
# au démarrage, on force OFF par sécurité (optionnel)
try:
self._set_gyro(False)
except Exception:
pass
while not self._stop.is_set():
try:
active = self._has_active_alert(cur)
except Exception as e:
log.error("[%s] Lecture alertes KO: %s -> reconnexion MySQL", self.site, e)
try:
cur.close(); cnx.close()
except Exception:
pass
cnx, cur = self._connect_mysql()
if not cnx:
break
active = False
if active:
self._normal_count = 0
self._set_gyro(True)
else:
self._normal_count += 1
if self._normal_count >= self.normal_confirm:
self._set_gyro(False)
time.sleep(self.check_sec)
finally:
try:
self._set_gyro(False)
except Exception:
pass
try:
cur.close(); cnx.close()
except Exception:
pass
log.info("[%s] GyroController stoppé", self.site)
if __name__ == "__main__":
# ---- CONFIG À ADAPTER ----
SITE = "Meudon"
ALERTES_TABLE = "Alertes_Meudon" # adaptez au nom réel
DB_CFG = dict(
host=(os.getenv("DB_HOST") or "162.19.78.131").strip(),
user=(os.getenv("DB_USER") or "sondes").strip(),
password=os.getenv("DB_PASSWORD") or "TX.)-U1!zq5Axdk4",
database=(os.getenv("DB_NAME") or "Sondes").strip(),
port=int(os.getenv("DB_PORT") or 3306),
)
MQTT_HOST = (os.getenv("MQTT_HOST") or "162.19.78.131").strip()
MQTT_PORT = int(os.getenv("MQTT_PORT") or 1883)
MQTT_USER = os.getenv("MQTT_USER") or "sondes"
MQTT_PASS = os.getenv("MQTT_PASSWORD") or "3J@bjYP0"
TOPIC_CMD = "Meudon/gyrophare/cmd"
print("MQTT_HOST =", repr(MQTT_HOST))
print("MQTT_PORT =", repr(MQTT_PORT))
drv = MqttGyroDriver(MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS, TOPIC_CMD)
ctl = GyroController(site_name=SITE, db_cfg=DB_CFG, alertes_table=ALERTES_TABLE,
mqtt_driver=drv, check_sec=DEF_CHECK_SEC, normal_confirm=DEF_NORMAL_CONFIRM)
ctl.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
ctl.stop()
drv.close()