169 lines
5.6 KiB
Python
169 lines
5.6 KiB
Python
# gyro_control.py
|
|
import os, time, logging, threading
|
|
import mysql.connector
|
|
import paho.mqtt.client as mqtt
|
|
|
|
log = logging.getLogger("gyro")
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
|
|
|
|
DEF_CHECK_SEC = int(os.getenv("GYRO_CHECK_SEC", "20"))
|
|
DEF_NORMAL_CONFIRM = int(os.getenv("GYRO_NORMAL_CONFIRM", "2"))
|
|
|
|
class MqttGyroDriver:
|
|
def __init__(self, host, port, user, password, topic_cmd):
|
|
self.topic_cmd = topic_cmd
|
|
self.client = mqtt.Client()
|
|
if user:
|
|
self.client.username_pw_set(user, password or "")
|
|
self.client.connect(host, int(port or 1883), keepalive=30)
|
|
self.client.loop_start()
|
|
|
|
def set(self, on: bool):
|
|
payload = "ON" if on else "OFF"
|
|
res = self.client.publish(self.topic_cmd, payload=payload, qos=1, retain=False)
|
|
res.wait_for_publish(timeout=5)
|
|
log.info("MQTT → %s : %s", self.topic_cmd, payload)
|
|
|
|
def close(self):
|
|
try:
|
|
self.client.loop_stop()
|
|
self.client.disconnect()
|
|
except Exception:
|
|
pass
|
|
|
|
class GyroController:
|
|
"""
|
|
Gyro ON en continu tant qu'il existe au moins une alerte Etat='En cours'.
|
|
Gyro OFF après 'normal_confirm' lectures consécutives sans alerte.
|
|
"""
|
|
def __init__(self, *, site_name: str, db_cfg: dict, alertes_table: str,
|
|
mqtt_driver: MqttGyroDriver, check_sec: int = DEF_CHECK_SEC,
|
|
normal_confirm: int = DEF_NORMAL_CONFIRM):
|
|
self.site = site_name
|
|
self.db_cfg = db_cfg
|
|
self.alertes_table = alertes_table
|
|
self.mqtt = mqtt_driver
|
|
self.check_sec = check_sec
|
|
self.normal_confirm = normal_confirm
|
|
|
|
self._stop = threading.Event()
|
|
self._thread = None
|
|
self._current_on = None
|
|
self._normal_count = 0
|
|
|
|
def _set_gyro(self, on: bool):
|
|
if self._current_on is not on:
|
|
self.mqtt.set(on)
|
|
self._current_on = on
|
|
|
|
def _has_active_alert(self, cur) -> bool:
|
|
cur.execute(f"SELECT COUNT(*) FROM `{self.alertes_table}` WHERE Etat='En cours'")
|
|
return cur.fetchone()[0] > 0
|
|
|
|
def start(self):
|
|
if self._thread and self._thread.is_alive():
|
|
return
|
|
self._stop.clear()
|
|
self._thread = threading.Thread(target=self._run, daemon=True)
|
|
self._thread.start()
|
|
log.info("[%s] GyroController démarré (check=%ss, confirm=%d)",
|
|
self.site, self.check_sec, self.normal_confirm)
|
|
|
|
def stop(self):
|
|
self._stop.set()
|
|
|
|
def _connect_mysql(self):
|
|
while not self._stop.is_set():
|
|
try:
|
|
cnx = mysql.connector.connect(autocommit=True, **self.db_cfg)
|
|
cur = cnx.cursor()
|
|
return cnx, cur
|
|
except Exception as e:
|
|
log.error("[%s] Connexion MySQL KO (%s). Retry 5s…", self.site, e)
|
|
time.sleep(5)
|
|
return None, None
|
|
|
|
def _run(self):
|
|
cnx, cur = self._connect_mysql()
|
|
if not cnx:
|
|
return
|
|
|
|
try:
|
|
# au démarrage, on force OFF par sécurité (optionnel)
|
|
try:
|
|
self._set_gyro(False)
|
|
except Exception:
|
|
pass
|
|
|
|
while not self._stop.is_set():
|
|
try:
|
|
active = self._has_active_alert(cur)
|
|
except Exception as e:
|
|
log.error("[%s] Lecture alertes KO: %s -> reconnexion MySQL", self.site, e)
|
|
try:
|
|
cur.close(); cnx.close()
|
|
except Exception:
|
|
pass
|
|
cnx, cur = self._connect_mysql()
|
|
if not cnx:
|
|
break
|
|
active = False
|
|
|
|
if active:
|
|
self._normal_count = 0
|
|
self._set_gyro(True)
|
|
else:
|
|
self._normal_count += 1
|
|
if self._normal_count >= self.normal_confirm:
|
|
self._set_gyro(False)
|
|
|
|
time.sleep(self.check_sec)
|
|
|
|
finally:
|
|
try:
|
|
self._set_gyro(False)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
cur.close(); cnx.close()
|
|
except Exception:
|
|
pass
|
|
log.info("[%s] GyroController stoppé", self.site)
|
|
|
|
if __name__ == "__main__":
|
|
# ---- CONFIG À ADAPTER ----
|
|
SITE = "Meudon"
|
|
ALERTES_TABLE = "Alertes_Meudon" # adaptez au nom réel
|
|
|
|
DB_CFG = dict(
|
|
host=(os.getenv("DB_HOST") or "162.19.78.131").strip(),
|
|
user=(os.getenv("DB_USER") or "sondes").strip(),
|
|
password=os.getenv("DB_PASSWORD") or "TX.)-U1!zq5Axdk4",
|
|
database=(os.getenv("DB_NAME") or "Sondes").strip(),
|
|
port=int(os.getenv("DB_PORT") or 3306),
|
|
)
|
|
|
|
MQTT_HOST = (os.getenv("MQTT_HOST") or "162.19.78.131").strip()
|
|
MQTT_PORT = int(os.getenv("MQTT_PORT") or 1883)
|
|
MQTT_USER = os.getenv("MQTT_USER") or "sondes"
|
|
MQTT_PASS = os.getenv("MQTT_PASSWORD") or "3J@bjYP0"
|
|
|
|
TOPIC_CMD = "Meudon/gyrophare/cmd"
|
|
|
|
print("MQTT_HOST =", repr(MQTT_HOST))
|
|
print("MQTT_PORT =", repr(MQTT_PORT))
|
|
|
|
drv = MqttGyroDriver(MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS, TOPIC_CMD)
|
|
ctl = GyroController(site_name=SITE, db_cfg=DB_CFG, alertes_table=ALERTES_TABLE,
|
|
mqtt_driver=drv, check_sec=DEF_CHECK_SEC, normal_confirm=DEF_NORMAL_CONFIRM)
|
|
ctl.start()
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
ctl.stop()
|
|
drv.close()
|