Files
Gestion_sondes/app/Monitor_Saclay.py

570 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ========= Site =========
SITE = "Saclay"
PROGRAM_NAME = f"Monitor_{SITE}"
# ========= Imports & .env =========
import os, re, time, ssl, smtplib, logging
import datetime as dt
from email.message import EmailMessage
from datetime import datetime
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv(usecwd=True), override=False)
# MySQL
import mysql.connector
from mysql.connector import Error as MySQLError
# OVH (SMS)
try:
import ovh
from ovh.exceptions import APIError as OVHAPIError
_ovh_available = True
except Exception:
ovh = None # type: ignore
class OVHAPIError(Exception): pass
_ovh_available = False
# MQTT
try:
import paho.mqtt.client as mqtt
_mqtt_ok = True
except Exception:
_mqtt_ok = False
# ========= Logger =========
log = logging.getLogger(PROGRAM_NAME.lower())
if not log.handlers:
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
# ========= DB utils =========
def open_alert(conn, table_alertes: str, sonde: str, dt: datetime) -> bool:
"""
Ouvre UNE alerte si aucune alerte 'En cours' n'existe encore pour la sonde.
Retourne True si une nouvelle alerte a été créée (→ notifier).
"""
cur = conn.cursor()
cur.execute(
f"SELECT 1 FROM `{table_alertes}` WHERE Sonde=%s AND Etat='En cours' LIMIT 1",
(sonde,)
)
if cur.fetchone():
cur.close()
return False # déjà ouverte → pas de notif
cur.execute(
f"INSERT INTO `{table_alertes}` (Sonde, Debut_defaut, Etat) VALUES (%s, %s, 'En cours')",
(sonde, dt.strftime('%Y-%m-%d %H:%M:%S'))
)
conn.commit()
cur.close()
return True # nouvelle alerte → notifier
def close_alert(conn, table_alertes: str, sonde: str) -> bool:
"""
Ferme l'alerte 'En cours' si présente.
Retourne True si une alerte est passée à 'Acquitté' (→ notifier).
"""
cur = conn.cursor()
cur.execute(
f"UPDATE `{table_alertes}` SET Etat='Acquitté' "
f"WHERE Sonde=%s AND Etat='En cours' "
f"ORDER BY Debut_defaut DESC LIMIT 1",
(sonde,)
)
changed = (cur.rowcount == 1)
conn.commit()
cur.close()
return changed # True → notifier, False → rien
def get_db():
cnx = mysql.connector.connect(
host=os.getenv("DB_HOST"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASS"),
database=os.getenv("DB_NAME", "Sondes"),
port=int(os.getenv("DB_PORT", "3306")),
autocommit=True,
)
return cnx
def lire_sondes_depuis_db(site: str):
table = site
sql = f"""
SELECT t1.Sonde, t1.Temperature, t1.Date
FROM `{table}` t1
JOIN (
SELECT Sonde, MAX(Date) AS MaxDate
FROM `{table}`
GROUP BY Sonde
) t2 ON t1.Sonde=t2.Sonde AND t1.Date=t2.MaxDate
"""
cnx = get_db()
try:
cur = cnx.cursor(dictionary=True)
cur.execute(sql)
rows = cur.fetchall()
for r in rows:
r["Temperature"] = float(r["Temperature"])
return rows
except MySQLError as err:
log.exception("Erreur DB (lire_sondes_depuis_db): %s", err)
return []
finally:
cnx.close()
def lire_cfg_chambres(site: str):
"""
Retourne un dict {sonde: {"temp_max": float, "active": bool, "entretien": bool}}
depuis Chambres_froides pour le site.
"""
sql = """
SELECT Sonde, Temp_Max, Etat, En_entretien
FROM Chambres_froides
WHERE Lieu=%s
"""
cnx = get_db()
cfg: dict[str, dict] = {}
try:
cur = cnx.cursor()
cur.execute(sql, (site,))
for sonde, temp_max, etat, en_entretien in cur.fetchall():
cfg[str(sonde)] = {
"temp_max": float(temp_max),
"active": str(etat).upper() == "ON",
"entretien": bool(int(en_entretien or 0)),
}
return cfg
except MySQLError as err:
log.exception("Erreur DB (lire_cfg_chambres): %s", err)
return cfg
finally:
cnx.close()
def compute_site_alarm(last_values: list[dict], cfg: dict[str, dict], hysteresis: float = 0.0):
"""
Retourne (is_on: bool, trigger: tuple[str,float,float] | None)
trigger = (sonde, temp, seuil) si dépassement détecté.
"""
for row in last_values:
sonde = str(row["Sonde"])
meta = cfg.get(sonde)
if not meta or not meta["active"] or meta["entretien"]:
continue
temp = float(row["Temperature"])
if temp > float(meta["temp_max"]) + 0.0:
return True, (sonde, temp, float(meta["temp_max"]))
return False, None
def lire_seuils_depuis_db(site: str):
sql = """
SELECT Sonde, Temp_Max
FROM Chambres_froides
WHERE Lieu=%s AND Etat='ON'
"""
cnx = get_db()
seuils = {}
try:
cur = cnx.cursor()
cur.execute(sql, (site,))
for sonde, s in cur.fetchall():
seuils[str(sonde)] = float(s)
return seuils
except MySQLError as err:
log.exception("Erreur DB (lire_seuils_depuis_db): %s", err)
return seuils
finally:
cnx.close()
def depassement_depuis_30min(site: str, sonde: str, seuil: float) -> bool:
table = site
cnx = get_db()
try:
cur = cnx.cursor()
cur.execute(f"""
SELECT Temperature, Date
FROM `{table}`
WHERE Sonde=%s
ORDER BY Date DESC
LIMIT 1
""", (sonde,))
last = cur.fetchone()
if not last:
return False
last_temp, last_date = float(last[0]), last[1]
if last_temp <= float(seuil):
return False
cur.execute(f"""
SELECT MIN(Date)
FROM `{table}`
WHERE Sonde=%s
AND Temperature > %s
AND Date >= (NOW() - INTERVAL 120 MINUTE)
""", (sonde, float(seuil)))
first_over = cur.fetchone()[0]
if not first_over:
return False
now = dt.datetime.now(tz=getattr(first_over, "tzinfo", None))
return (now - first_over) >= dt.timedelta(minutes=30)
except MySQLError as err:
log.exception("Erreur DB (depassement_depuis_30min): %s", err)
return False
finally:
cnx.close()
def any_alert_open(site: str) -> bool:
table = f"Alertes_{site}"
cnx = get_db()
try:
cur = cnx.cursor()
cur.execute(f"SELECT 1 FROM `{table}` WHERE `Etat`='En cours' LIMIT 1")
return cur.fetchone() is not None
except MySQLError as err:
log.exception("Erreur DB (any_alert_open): %s", err)
return False
finally:
cnx.close()
# ========= Helpers listes/numéros =========
def _split_list(raw: str | None) -> list[str]:
return [x.strip() for x in re.split(r"[;,]", raw or "") if x.strip()]
def _parse_labeled_phones(raw: str | None) -> list[tuple[str, str]]:
out: list[tuple[str, str]] = []
for tok in re.split(r"[;,]", raw or ""):
tok = tok.strip()
if not tok:
continue
if ":" in tok:
name, num = tok.split(":", 1)
out.append((name.strip(), num.strip()))
else:
out.append(("", tok))
return out
def _resolve_sms_receivers(labeled: list[tuple[str, str]]) -> list[str]:
only = os.getenv("ALERT_SMS_ONLY")
if not only:
return [num for (_n, num) in labeled]
allow = {x.strip() for x in re.split(r"[;,]", only) if x.strip()}
return [num for (name, num) in labeled if (name and name in allow) or (num in allow)]
def _human_labeled_list(labeled: list[tuple[str, str]]) -> str:
return ", ".join([f"{n}({p})" if n else p for n, p in labeled])
# ========= Notifier (SMS + Mail) =========
class Notifier:
def __init__(self):
# OVH SMS
self.ovh_enabled = _ovh_available and all(
os.getenv(k) for k in ("OVH_APPLICATION_KEY","OVH_APPLICATION_SECRET","OVH_CONSUMER_KEY","OVH_SMS_SERVICE","OVH_SMS_SENDER")
)
if self.ovh_enabled:
self.ovh_client = ovh.Client(
endpoint=os.getenv("OVH_ENDPOINT","ovh-eu"),
application_key=os.getenv("OVH_APPLICATION_KEY"),
application_secret=os.getenv("OVH_APPLICATION_SECRET"),
consumer_key=os.getenv("OVH_CONSUMER_KEY"),
)
self.ovh_service = os.getenv("OVH_SMS_SERVICE")
self.ovh_sender = os.getenv("OVH_SMS_SENDER")
raw_sms = (os.getenv("ALERT_SMS_TO_Saclay") or os.getenv("ALERT_SMS_TO_SACLAY") or os.getenv("ALERT_SMS_TO"))
self.sms_labeled = _parse_labeled_phones(raw_sms)
else:
self.sms_labeled = []
# SMTP
self.smtp_host = os.getenv("SMTP_HOST")
self.smtp_port = int(os.getenv("SMTP_PORT","465"))
self.smtp_user = os.getenv("SMTP_USER")
self.smtp_pass = os.getenv("SMTP_PASS")
self.smtp_security = (os.getenv("SMTP_SECURITY","SSL") or "SSL").upper()
raw_mail_to = (os.getenv("MAIL_TO_Saclay") or os.getenv("MAIL_TO_SACLAY") or os.getenv("MAIL_TO") or "")
self.mail_to = _split_list(raw_mail_to)
self.mail_from = (os.getenv("MAIL_FROM_Saclay") or os.getenv("MAIL_FROM_SACLAY") or os.getenv("MAIL_FROM") or self.smtp_user)
self.smtp_enabled = all([self.smtp_host, self.smtp_port, self.smtp_user, self.smtp_pass, self.mail_to])
def send_sms(self, message: str, tag: str = f"monitor-{SITE.lower()}") -> bool:
if not self.ovh_enabled or not self.sms_labeled:
log.warning("SMS désactivé ou aucun destinataire.")
return False
receivers = _resolve_sms_receivers(self.sms_labeled)
if not receivers:
log.warning("ALERT_SMS_ONLY filtre tous les destinataires (aucun envoi).")
return False
payload = {
"sender": self.ovh_sender,
"receivers": receivers,
"message": message[:1600],
"priority": "high",
"coding": "7bit",
"class": "phoneDisplay",
"noStopClause": True,
"senderForResponse": False,
"validityPeriod": 2880,
"tag": tag,
}
try:
log.info("Envoi SMS vers: %s", _human_labeled_list([(n,p) for (n,p) in self.sms_labeled if p in receivers]))
resp = self.ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload)
ids = resp.get("ids") or []
log.info("SMS OVH envoyé (job ids=%s)", ids)
try:
if ids:
job_id = ids[0]
for _ in range(3):
job = self.ovh_client.get(f"/sms/{self.ovh_service}/jobs/{job_id}")
if job.get("status") in ("done","error","cancelled"):
log.info("Statut job SMS: %s", job.get("status")); break
time.sleep(1.5)
except Exception as e:
log.debug("Suivi job OVH indisponible (OK): %s", e)
return True
except OVHAPIError as err:
log.exception("Erreur API OVH: %s", err); return False
except Exception as err:
log.exception("Echec envoi SMS OVH: %s", err); return False
def send_email(self, subject: str, body: str) -> bool:
if not self.smtp_enabled:
log.warning("SMTP non configuré, email non envoyé."); return False
msg = EmailMessage()
msg["From"] = self.mail_from
msg["To"] = ", ".join(self.mail_to)
msg["Subject"] = subject
msg.set_content(body)
timeout = int(os.getenv("SMTP_TIMEOUT","60"))
debug = os.getenv("SMTP_DEBUG","0") == "1"
def _send_ssl():
with smtplib.SMTP_SSL(self.smtp_host, 465, context=ssl.create_default_context(), timeout=timeout) as server:
if debug: server.set_debuglevel(1)
server.login(self.smtp_user, self.smtp_pass)
server.send_message(msg)
def _send_starttls():
with smtplib.SMTP(self.smtp_host, self.smtp_port, timeout=timeout) as server:
if debug: server.set_debuglevel(1)
server.ehlo(); server.starttls(context=ssl.create_default_context()); server.ehlo()
server.login(self.smtp_user, self.smtp_pass)
server.send_message(msg)
try:
if self.smtp_security == "STARTTLS":
try:
_send_starttls()
except (smtplib.SMTPServerDisconnected, TimeoutError, smtplib.SMTPConnectError) as err:
log.warning("STARTTLS/587 a échoué (%s). Tentative en SSL/465...", err)
_send_ssl()
else:
_send_ssl()
log.info("Email envoyé à %s", self.mail_to)
return True
except (smtplib.SMTPException, ssl.SSLError, TimeoutError) as err:
log.exception("Erreur SMTP: %s", err); return False
except Exception as err:
log.exception("Echec envoi email: %s", err); return False
# ========= Mise en forme messages =========
from zoneinfo import ZoneInfo
PARIS = ZoneInfo("Europe/Paris")
def fmt_deg(v: float) -> str:
s = f"{float(v):.1f}".replace(".", ","); return f"{s}°C"
def now_paris() -> dt.datetime:
return dt.datetime.now(tz=PARIS)
def build_alert_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None):
when = when or now_paris()
subject = f"[ALERTE {site}] {sonde} au-dessus du seuil"
lines = [subject + ":", f"Sonde: {sonde}", f"Température: {fmt_deg(temp)} (seuil {fmt_deg(seuil)})", f"Site: {site}", f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}"]
txt = "\n".join(lines)
return subject, txt, txt
def build_ok_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None):
when = when or now_paris()
subject = f"[OK {site}] {sonde} revenue normale"
lines = [subject + ":", f"Sonde: {sonde}", f"Température: {fmt_deg(temp)} <= seuil {fmt_deg(seuil)}", f"Site: {site}", f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}"]
txt = "\n".join(lines)
return subject, txt, txt
# ========= Gyrophare MQTT =========
class MQTTPublisher:
def __init__(self, site: str):
self.enabled = (_mqtt_ok and (os.getenv("GYRO_MODE", "").lower() == "mqtt"))
self.site = site
self.topic = (
os.getenv(f"GYRO_MQTT_TOPIC_{site}") or
os.getenv(f"GYRO_MQTT_TOPIC_{site.upper()}") or
os.getenv("GYRO_MQTT_TOPIC") or
f"Sondes/{site}/Gyro/cmd"
)
self.last_state: bool | None = None
if not self.enabled:
log.info("Gyro MQTT désactivé (GYRO_MODE != mqtt ou paho-mqtt absent).")
return
if not self.topic:
log.warning("Topic MQTT manquant pour %s (GYRO_MQTT_TOPIC_%s)", site, site)
self.enabled = False
return
host = os.getenv("MQTT_HOST", "localhost")
port = int(os.getenv("MQTT_PORT", "1883"))
user = os.getenv("MQTT_USER")
pwd = os.getenv("MQTT_PASS")
tls = (os.getenv("MQTT_TLS", "0") == "1")
# --- Création du client MQTT : compatible paho 1.x et 2.x ---
cbver = getattr(mqtt, "CallbackAPIVersion", None)
if cbver is not None:
# paho >= 2.x : on choisit la meilleure constante disponible
api_v = (
getattr(cbver, "VERSION2", None) # paho 2.x
or getattr(cbver, "V5", None) # certaines builds
or getattr(cbver, "v5", None) # fallback
or getattr(cbver, "V311", None) # dernier recours
)
try:
self.client = mqtt.Client(callback_api_version=api_v) if api_v else mqtt.Client()
except TypeError:
# vieux paho ne supporte pas largument callback_api_version
self.client = mqtt.Client()
else:
# paho 1.x
self.client = mqtt.Client()
# ------------------------------------------------------------
if user and pwd:
self.client.username_pw_set(user, pwd)
if tls:
self.client.tls_set()
try:
self.client.connect(host, port, keepalive=30)
self.client.loop_start()
log.info("MQTT connecté (%s:%s), topic=%s", host, port, self.topic)
except Exception as e:
log.exception("MQTT connexion impossible: %s", e)
self.enabled = False
def set(self, on: bool):
if not self.enabled:
return
if self.last_state is not None and self.last_state == on:
return
payload = "ON" if on else "OFF"
try:
r = self.client.publish(self.topic, payload=payload, qos=2, retain=True)
r.wait_for_publish(timeout=3)
if r.rc != 0:
log.warning("MQTT publish rc=%s (topic=%s)", r.rc, self.topic)
else:
log.info("Gyro %s -> %s (MQTT)", self.site, payload.upper())
self.last_state = on
except Exception as e:
log.exception("MQTT publish erreur: %s", e)
# ========= Notifs haut-niveau =========
notifier = Notifier()
beacon = MQTTPublisher(SITE)
def notifier_sur_depassement(site: str, sonde: str, temp: float, seuil: float):
subject, sms_text, email_body = build_alert_text(site, sonde, temp, seuil)
notifier.send_sms(sms_text)
notifier.send_email(subject, email_body)
def notifier_acquittement(site: str, sonde: str, temp: float, seuil: float):
subject, sms_text, _ = build_ok_text(site, sonde, temp, seuil)
notifier.send_sms(sms_text)
# ========= Cycle & boucle =========
def run_monitor_cycle(site: str = SITE):
# 1) Lecture dernières mesures + config chambres
last_rows = lire_sondes_depuis_db(site) # [{'Sonde','Temperature','Date'}]
cfg = lire_cfg_chambres(site) # {sonde: {temp_max, active, entretien}}
# 2) Gyro instantané : ON si >=1 sonde active & non en entretien dépasse son seuil
try:
gyro_on, trigger = compute_site_alarm(last_rows, cfg, hysteresis=float(os.getenv("GYRO_HYSTERESIS", "0.0")))
if trigger:
s, t, se = trigger
log.info("Gyro %s => ON (déclenché par %s: %.2f > %.2f)", site, s, t, se)
else:
log.info("Gyro %s => OFF (aucun dépassement)", site)
beacon.set(gyro_on)
except Exception as e:
log.exception("Erreur calcul/publish gyrophare: %s", e)
# 3) Alertes "officielles" (inchangées) avec temporisation 30 min
# On reconstitue un dict seuils pour réutiliser ta logique existante en dessous.
seuils = {s: meta["temp_max"] for s, meta in cfg.items() if meta.get("active", False)}
for r in last_rows:
nom = str(r["Sonde"])
temp = float(r["Temperature"])
seuil = float(seuils.get(nom, 6.0))
now = now_paris()
if temp > seuil:
if depassement_depuis_30min(site, nom, seuil):
# Ouvrir si pas déjà ouvert → notifier seulement si ouverture réelle
try:
conn = get_db()
if open_alert(conn, f"Alertes_{site}", nom, now):
notifier_sur_depassement(site, nom, temp, seuil)
finally:
conn.close()
else:
# Fermer si ouvert → notifier seulement si fermeture réelle
try:
conn = get_db()
if close_alert(conn, f"Alertes_{site}", nom):
notifier_acquittement(site, nom, temp, seuil)
finally:
conn.close()
def run_monitor_loop(site: str = SITE, period_sec: int = 300):
log.info("%s démarré (site=%s, période=%ss) ✅", PROGRAM_NAME, site, period_sec)
while True:
t0 = time.time()
try:
run_monitor_cycle(site)
except Exception as err:
log.exception("Erreur cycle monitoring: %s", err)
time.sleep(max(0, period_sec - (time.time() - t0)))
# ========= CLI =========
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser(description=PROGRAM_NAME)
p.add_argument("--period", type=int, default=300)
p.add_argument("--test-sms", action="store_true")
p.add_argument("--test-mail", action="store_true")
p.add_argument("--test-alert", action="store_true")
p.add_argument("--test-ok", action="store_true")
p.add_argument("--once", action="store_true")
args = p.parse_args()
if args.test_sms: notifier.send_sms("TEST DOMO91 (transactionnel)")
elif args.test_mail: notifier.send_email(f"[TEST {SITE}] Mail", "OK")
elif args.test_alert: notifier_sur_depassement(SITE, "Congelateur", -14.5, -15.0)
elif args.test_ok: notifier_acquittement(SITE, "Congelateur", -15.2, -15.0)
else:
if args.once: run_monitor_cycle(SITE)
else: run_monitor_loop(SITE, period_sec=args.period)