510 lines
18 KiB
Python
510 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
# ========= Site =========
|
||
SITE = "Saclay"
|
||
PROGRAM_NAME = f"Monitor_{SITE}"
|
||
|
||
# ========= Imports & .env =========
|
||
import os, re, time, ssl, smtplib, logging
|
||
import datetime as dt
|
||
from email.message import EmailMessage
|
||
from datetime import datetime
|
||
from dotenv import load_dotenv, find_dotenv
|
||
load_dotenv(find_dotenv(usecwd=True), override=False)
|
||
|
||
# MySQL
|
||
import mysql.connector
|
||
from mysql.connector import Error as MySQLError
|
||
|
||
# OVH (SMS)
|
||
try:
|
||
import ovh
|
||
from ovh.exceptions import APIError as OVHAPIError
|
||
_ovh_available = True
|
||
except Exception:
|
||
ovh = None # type: ignore
|
||
class OVHAPIError(Exception): pass
|
||
_ovh_available = False
|
||
|
||
# MQTT
|
||
try:
|
||
import paho.mqtt.client as mqtt
|
||
_mqtt_ok = True
|
||
except Exception:
|
||
_mqtt_ok = False
|
||
|
||
# ========= Logger =========
|
||
log = logging.getLogger(PROGRAM_NAME.lower())
|
||
if not log.handlers:
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||
|
||
# ========= DB utils =========
|
||
def open_alert(conn, table_alertes: str, sonde: str, dt: datetime) -> bool:
|
||
"""
|
||
Ouvre UNE alerte si aucune alerte 'En cours' n'existe encore pour la sonde.
|
||
Retourne True si une nouvelle alerte a été créée (→ notifier).
|
||
"""
|
||
cur = conn.cursor()
|
||
cur.execute(
|
||
f"SELECT 1 FROM `{table_alertes}` WHERE Sonde=%s AND Etat='En cours' LIMIT 1",
|
||
(sonde,)
|
||
)
|
||
if cur.fetchone():
|
||
cur.close()
|
||
return False # déjà ouverte → pas de notif
|
||
|
||
cur.execute(
|
||
f"INSERT INTO `{table_alertes}` (Sonde, Debut_defaut, Etat) VALUES (%s, %s, 'En cours')",
|
||
(sonde, dt.strftime('%Y-%m-%d %H:%M:%S'))
|
||
)
|
||
conn.commit()
|
||
cur.close()
|
||
return True # nouvelle alerte → notifier
|
||
|
||
def close_alert(conn, table_alertes: str, sonde: str) -> bool:
|
||
"""
|
||
Ferme l'alerte 'En cours' si présente.
|
||
Retourne True si une alerte est passée à 'Acquitté' (→ notifier).
|
||
"""
|
||
cur = conn.cursor()
|
||
cur.execute(
|
||
f"UPDATE `{table_alertes}` SET Etat='Acquitté' "
|
||
f"WHERE Sonde=%s AND Etat='En cours' "
|
||
f"ORDER BY Debut_defaut DESC LIMIT 1",
|
||
(sonde,)
|
||
)
|
||
changed = (cur.rowcount == 1)
|
||
conn.commit()
|
||
cur.close()
|
||
return changed # True → notifier, False → rien
|
||
|
||
def get_db():
|
||
cnx = mysql.connector.connect(
|
||
host=os.getenv("DB_HOST"),
|
||
user=os.getenv("DB_USER"),
|
||
password=os.getenv("DB_PASS"),
|
||
database=os.getenv("DB_NAME", "Sondes"),
|
||
port=int(os.getenv("DB_PORT", "3306")),
|
||
autocommit=True,
|
||
)
|
||
return cnx
|
||
|
||
def lire_sondes_depuis_db(site: str):
|
||
table = site
|
||
sql = f"""
|
||
SELECT t1.Sonde, t1.Temperature, t1.Date
|
||
FROM `{table}` t1
|
||
JOIN (
|
||
SELECT Sonde, MAX(Date) AS MaxDate
|
||
FROM `{table}`
|
||
GROUP BY Sonde
|
||
) t2 ON t1.Sonde=t2.Sonde AND t1.Date=t2.MaxDate
|
||
"""
|
||
cnx = get_db()
|
||
try:
|
||
cur = cnx.cursor(dictionary=True)
|
||
cur.execute(sql)
|
||
rows = cur.fetchall()
|
||
for r in rows:
|
||
r["Temperature"] = float(r["Temperature"])
|
||
return rows
|
||
except MySQLError as err:
|
||
log.exception("Erreur DB (lire_sondes_depuis_db): %s", err)
|
||
return []
|
||
finally:
|
||
cnx.close()
|
||
|
||
def lire_seuils_depuis_db(site: str):
|
||
sql = """
|
||
SELECT Sonde, Temp_Max
|
||
FROM Chambres_froides
|
||
WHERE Lieu=%s AND Etat='ON'
|
||
"""
|
||
cnx = get_db()
|
||
seuils = {}
|
||
try:
|
||
cur = cnx.cursor()
|
||
cur.execute(sql, (site,))
|
||
for sonde, s in cur.fetchall():
|
||
seuils[str(sonde)] = float(s)
|
||
return seuils
|
||
except MySQLError as err:
|
||
log.exception("Erreur DB (lire_seuils_depuis_db): %s", err)
|
||
return seuils
|
||
finally:
|
||
cnx.close()
|
||
|
||
def depassement_depuis_30min(site: str, sonde: str, seuil: float) -> bool:
|
||
table = site
|
||
cnx = get_db()
|
||
try:
|
||
cur = cnx.cursor()
|
||
|
||
cur.execute(f"""
|
||
SELECT Temperature, Date
|
||
FROM `{table}`
|
||
WHERE Sonde=%s
|
||
ORDER BY Date DESC
|
||
LIMIT 1
|
||
""", (sonde,))
|
||
last = cur.fetchone()
|
||
if not last:
|
||
return False
|
||
last_temp, last_date = float(last[0]), last[1]
|
||
if last_temp <= float(seuil):
|
||
return False
|
||
|
||
cur.execute(f"""
|
||
SELECT MIN(Date)
|
||
FROM `{table}`
|
||
WHERE Sonde=%s
|
||
AND Temperature > %s
|
||
AND Date >= (NOW() - INTERVAL 120 MINUTE)
|
||
""", (sonde, float(seuil)))
|
||
first_over = cur.fetchone()[0]
|
||
if not first_over:
|
||
return False
|
||
|
||
now = dt.datetime.now(tz=getattr(first_over, "tzinfo", None))
|
||
return (now - first_over) >= dt.timedelta(minutes=30)
|
||
except MySQLError as err:
|
||
log.exception("Erreur DB (depassement_depuis_30min): %s", err)
|
||
return False
|
||
finally:
|
||
cnx.close()
|
||
|
||
def any_alert_open(site: str) -> bool:
|
||
table = f"Alertes_{site}"
|
||
cnx = get_db()
|
||
try:
|
||
cur = cnx.cursor()
|
||
cur.execute(f"SELECT 1 FROM `{table}` WHERE `Etat`='En cours' LIMIT 1")
|
||
return cur.fetchone() is not None
|
||
except MySQLError as err:
|
||
log.exception("Erreur DB (any_alert_open): %s", err)
|
||
return False
|
||
finally:
|
||
cnx.close()
|
||
# ========= Helpers listes/numéros =========
|
||
def _split_list(raw: str | None) -> list[str]:
|
||
return [x.strip() for x in re.split(r"[;,]", raw or "") if x.strip()]
|
||
|
||
def _parse_labeled_phones(raw: str | None) -> list[tuple[str, str]]:
|
||
out: list[tuple[str, str]] = []
|
||
for tok in re.split(r"[;,]", raw or ""):
|
||
tok = tok.strip()
|
||
if not tok:
|
||
continue
|
||
if ":" in tok:
|
||
name, num = tok.split(":", 1)
|
||
out.append((name.strip(), num.strip()))
|
||
else:
|
||
out.append(("", tok))
|
||
return out
|
||
|
||
def _resolve_sms_receivers(labeled: list[tuple[str, str]]) -> list[str]:
|
||
only = os.getenv("ALERT_SMS_ONLY")
|
||
if not only:
|
||
return [num for (_n, num) in labeled]
|
||
allow = {x.strip() for x in re.split(r"[;,]", only) if x.strip()}
|
||
return [num for (name, num) in labeled if (name and name in allow) or (num in allow)]
|
||
|
||
def _human_labeled_list(labeled: list[tuple[str, str]]) -> str:
|
||
return ", ".join([f"{n}({p})" if n else p for n, p in labeled])
|
||
|
||
# ========= Notifier (SMS + Mail) =========
|
||
class Notifier:
|
||
def __init__(self):
|
||
# OVH SMS
|
||
self.ovh_enabled = _ovh_available and all(
|
||
os.getenv(k) for k in ("OVH_APPLICATION_KEY","OVH_APPLICATION_SECRET","OVH_CONSUMER_KEY","OVH_SMS_SERVICE","OVH_SMS_SENDER")
|
||
)
|
||
if self.ovh_enabled:
|
||
self.ovh_client = ovh.Client(
|
||
endpoint=os.getenv("OVH_ENDPOINT","ovh-eu"),
|
||
application_key=os.getenv("OVH_APPLICATION_KEY"),
|
||
application_secret=os.getenv("OVH_APPLICATION_SECRET"),
|
||
consumer_key=os.getenv("OVH_CONSUMER_KEY"),
|
||
)
|
||
self.ovh_service = os.getenv("OVH_SMS_SERVICE")
|
||
self.ovh_sender = os.getenv("OVH_SMS_SENDER")
|
||
raw_sms = (os.getenv("ALERT_SMS_TO_Saclay") or os.getenv("ALERT_SMS_TO_SACLAY") or os.getenv("ALERT_SMS_TO"))
|
||
self.sms_labeled = _parse_labeled_phones(raw_sms)
|
||
else:
|
||
self.sms_labeled = []
|
||
|
||
# SMTP
|
||
self.smtp_host = os.getenv("SMTP_HOST")
|
||
self.smtp_port = int(os.getenv("SMTP_PORT","465"))
|
||
self.smtp_user = os.getenv("SMTP_USER")
|
||
self.smtp_pass = os.getenv("SMTP_PASS")
|
||
self.smtp_security = (os.getenv("SMTP_SECURITY","SSL") or "SSL").upper()
|
||
|
||
raw_mail_to = (os.getenv("MAIL_TO_Saclay") or os.getenv("MAIL_TO_SACLAY") or os.getenv("MAIL_TO") or "")
|
||
self.mail_to = _split_list(raw_mail_to)
|
||
self.mail_from = (os.getenv("MAIL_FROM_Saclay") or os.getenv("MAIL_FROM_SACLAY") or os.getenv("MAIL_FROM") or self.smtp_user)
|
||
|
||
self.smtp_enabled = all([self.smtp_host, self.smtp_port, self.smtp_user, self.smtp_pass, self.mail_to])
|
||
|
||
def send_sms(self, message: str, tag: str = f"monitor-{SITE.lower()}") -> bool:
|
||
if not self.ovh_enabled or not self.sms_labeled:
|
||
log.warning("SMS désactivé ou aucun destinataire.")
|
||
return False
|
||
receivers = _resolve_sms_receivers(self.sms_labeled)
|
||
if not receivers:
|
||
log.warning("ALERT_SMS_ONLY filtre tous les destinataires (aucun envoi).")
|
||
return False
|
||
|
||
payload = {
|
||
"sender": self.ovh_sender,
|
||
"receivers": receivers,
|
||
"message": message[:1600],
|
||
"priority": "high",
|
||
"coding": "7bit",
|
||
"class": "phoneDisplay",
|
||
"noStopClause": True,
|
||
"senderForResponse": False,
|
||
"validityPeriod": 2880,
|
||
"tag": tag,
|
||
}
|
||
try:
|
||
log.info("Envoi SMS vers: %s", _human_labeled_list([(n,p) for (n,p) in self.sms_labeled if p in receivers]))
|
||
resp = self.ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload)
|
||
ids = resp.get("ids") or []
|
||
log.info("SMS OVH envoyé (job ids=%s)", ids)
|
||
try:
|
||
if ids:
|
||
job_id = ids[0]
|
||
for _ in range(3):
|
||
job = self.ovh_client.get(f"/sms/{self.ovh_service}/jobs/{job_id}")
|
||
if job.get("status") in ("done","error","cancelled"):
|
||
log.info("Statut job SMS: %s", job.get("status")); break
|
||
time.sleep(1.5)
|
||
except Exception as e:
|
||
log.debug("Suivi job OVH indisponible (OK): %s", e)
|
||
return True
|
||
except OVHAPIError as err:
|
||
log.exception("Erreur API OVH: %s", err); return False
|
||
except Exception as err:
|
||
log.exception("Echec envoi SMS OVH: %s", err); return False
|
||
|
||
def send_email(self, subject: str, body: str) -> bool:
|
||
if not self.smtp_enabled:
|
||
log.warning("SMTP non configuré, email non envoyé."); return False
|
||
|
||
msg = EmailMessage()
|
||
msg["From"] = self.mail_from
|
||
msg["To"] = ", ".join(self.mail_to)
|
||
msg["Subject"] = subject
|
||
msg.set_content(body)
|
||
|
||
timeout = int(os.getenv("SMTP_TIMEOUT","60"))
|
||
debug = os.getenv("SMTP_DEBUG","0") == "1"
|
||
|
||
def _send_ssl():
|
||
with smtplib.SMTP_SSL(self.smtp_host, 465, context=ssl.create_default_context(), timeout=timeout) as server:
|
||
if debug: server.set_debuglevel(1)
|
||
server.login(self.smtp_user, self.smtp_pass)
|
||
server.send_message(msg)
|
||
|
||
def _send_starttls():
|
||
with smtplib.SMTP(self.smtp_host, self.smtp_port, timeout=timeout) as server:
|
||
if debug: server.set_debuglevel(1)
|
||
server.ehlo(); server.starttls(context=ssl.create_default_context()); server.ehlo()
|
||
server.login(self.smtp_user, self.smtp_pass)
|
||
server.send_message(msg)
|
||
|
||
try:
|
||
if self.smtp_security == "STARTTLS":
|
||
try:
|
||
_send_starttls()
|
||
except (smtplib.SMTPServerDisconnected, TimeoutError, smtplib.SMTPConnectError) as err:
|
||
log.warning("STARTTLS/587 a échoué (%s). Tentative en SSL/465...", err)
|
||
_send_ssl()
|
||
else:
|
||
_send_ssl()
|
||
log.info("Email envoyé à %s", self.mail_to)
|
||
return True
|
||
except (smtplib.SMTPException, ssl.SSLError, TimeoutError) as err:
|
||
log.exception("Erreur SMTP: %s", err); return False
|
||
except Exception as err:
|
||
log.exception("Echec envoi email: %s", err); return False
|
||
|
||
# ========= Mise en forme messages =========
|
||
from zoneinfo import ZoneInfo
|
||
PARIS = ZoneInfo("Europe/Paris")
|
||
|
||
def fmt_deg(v: float) -> str:
|
||
s = f"{float(v):.1f}".replace(".", ","); return f"{s}°C"
|
||
|
||
def now_paris() -> dt.datetime:
|
||
return dt.datetime.now(tz=PARIS)
|
||
|
||
def build_alert_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None):
|
||
when = when or now_paris()
|
||
subject = f"[ALERTE {site}] {sonde} au-dessus du seuil"
|
||
lines = [subject + ":", f"Sonde: {sonde}", f"Température: {fmt_deg(temp)} (seuil {fmt_deg(seuil)})", f"Site: {site}", f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}"]
|
||
txt = "\n".join(lines)
|
||
return subject, txt, txt
|
||
|
||
def build_ok_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None):
|
||
when = when or now_paris()
|
||
subject = f"[OK {site}] {sonde} revenue normale"
|
||
lines = [subject + ":", f"Sonde: {sonde}", f"Température: {fmt_deg(temp)} <= seuil {fmt_deg(seuil)}", f"Site: {site}", f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}"]
|
||
txt = "\n".join(lines)
|
||
return subject, txt, txt
|
||
|
||
# ========= Gyrophare MQTT =========
|
||
class MQTTPublisher:
|
||
def __init__(self, site: str):
|
||
self.enabled = (_mqtt_ok and (os.getenv("GYRO_MODE", "").lower() == "mqtt"))
|
||
self.site = site
|
||
self.topic = (os.getenv(f"GYRO_MQTT_TOPIC_{site}") or
|
||
os.getenv(f"GYRO_MQTT_TOPIC_{site.capitalize()}"))
|
||
self.last_state: bool | None = None
|
||
|
||
if not self.enabled:
|
||
log.info("Gyro MQTT désactivé (GYRO_MODE != mqtt ou paho-mqtt absent).")
|
||
return
|
||
if not self.topic:
|
||
log.warning("Topic MQTT manquant pour %s (GYRO_MQTT_TOPIC_%s)", site, site)
|
||
self.enabled = False
|
||
return
|
||
|
||
host = os.getenv("MQTT_HOST", "localhost")
|
||
port = int(os.getenv("MQTT_PORT", "1883"))
|
||
user = os.getenv("MQTT_USER")
|
||
pwd = os.getenv("MQTT_PASS")
|
||
tls = (os.getenv("MQTT_TLS", "0") == "1")
|
||
|
||
# --- Création du client MQTT : compatible paho 1.x et 2.x ---
|
||
cbver = getattr(mqtt, "CallbackAPIVersion", None)
|
||
if cbver is not None:
|
||
# paho >= 2.x : on choisit la meilleure constante disponible
|
||
api_v = (
|
||
getattr(cbver, "VERSION2", None) # paho 2.x
|
||
or getattr(cbver, "V5", None) # certaines builds
|
||
or getattr(cbver, "v5", None) # fallback
|
||
or getattr(cbver, "V311", None) # dernier recours
|
||
)
|
||
try:
|
||
self.client = mqtt.Client(callback_api_version=api_v) if api_v else mqtt.Client()
|
||
except TypeError:
|
||
# vieux paho ne supporte pas l’argument callback_api_version
|
||
self.client = mqtt.Client()
|
||
else:
|
||
# paho 1.x
|
||
self.client = mqtt.Client()
|
||
# ------------------------------------------------------------
|
||
|
||
if user and pwd:
|
||
self.client.username_pw_set(user, pwd)
|
||
if tls:
|
||
self.client.tls_set()
|
||
|
||
try:
|
||
self.client.connect(host, port, keepalive=30)
|
||
self.client.loop_start()
|
||
log.info("MQTT connecté (%s:%s), topic=%s", host, port, self.topic)
|
||
except Exception as e:
|
||
log.exception("MQTT connexion impossible: %s", e)
|
||
self.enabled = False
|
||
|
||
def set(self, on: bool):
|
||
if not self.enabled:
|
||
return
|
||
if self.last_state is not None and self.last_state == on:
|
||
return
|
||
|
||
payload = "on" if on else "off"
|
||
try:
|
||
r = self.client.publish(self.topic, payload=payload, qos=1, retain=True)
|
||
r.wait_for_publish(timeout=3)
|
||
if r.rc != 0:
|
||
log.warning("MQTT publish rc=%s (topic=%s)", r.rc, self.topic)
|
||
else:
|
||
log.info("Gyro %s -> %s (MQTT)", self.site, payload.upper())
|
||
self.last_state = on
|
||
except Exception as e:
|
||
log.exception("MQTT publish erreur: %s", e)
|
||
|
||
|
||
# ========= Notifs haut-niveau =========
|
||
notifier = Notifier()
|
||
beacon = MQTTPublisher(SITE)
|
||
|
||
def notifier_sur_depassement(site: str, sonde: str, temp: float, seuil: float):
|
||
subject, sms_text, email_body = build_alert_text(site, sonde, temp, seuil)
|
||
notifier.send_sms(sms_text)
|
||
notifier.send_email(subject, email_body)
|
||
try: beacon.set(True)
|
||
except Exception: pass
|
||
|
||
def notifier_acquittement(site: str, sonde: str, temp: float, seuil: float):
|
||
subject, sms_text, _ = build_ok_text(site, sonde, temp, seuil)
|
||
notifier.send_sms(sms_text)
|
||
try:
|
||
if not any_alert_open(site):
|
||
beacon.set(False)
|
||
except Exception: pass
|
||
|
||
# ========= Cycle & boucle =========
|
||
def run_monitor_cycle(site: str = SITE):
|
||
sondes = lire_sondes_depuis_db(site)
|
||
seuils = lire_seuils_depuis_db(site)
|
||
for r in sondes:
|
||
nom = str(r["Sonde"])
|
||
temp = float(r["Temperature"])
|
||
seuil = float(seuils.get(nom, 6.0))
|
||
|
||
now = now_paris()
|
||
if temp > seuil:
|
||
if depassement_depuis_30min(site, nom, seuil):
|
||
# Ouvrir si pas déjà ouvert → notifier seulement si ouverture réelle
|
||
try:
|
||
conn = get_db()
|
||
if open_alert(conn, f"Alertes_{site}", nom, now):
|
||
notifier_sur_depassement(site, nom, temp, seuil)
|
||
finally:
|
||
conn.close()
|
||
else:
|
||
# Fermer si ouvert → notifier seulement si fermeture réelle
|
||
try:
|
||
conn = get_db()
|
||
if close_alert(conn, f"Alertes_{site}", nom):
|
||
notifier_acquittement(site, nom, temp, seuil)
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def run_monitor_loop(site: str = SITE, period_sec: int = 300):
|
||
log.info("%s démarré (site=%s, période=%ss) ✅", PROGRAM_NAME, site, period_sec)
|
||
while True:
|
||
t0 = time.time()
|
||
try:
|
||
run_monitor_cycle(site)
|
||
except Exception as err:
|
||
log.exception("Erreur cycle monitoring: %s", err)
|
||
time.sleep(max(0, period_sec - (time.time() - t0)))
|
||
|
||
# ========= CLI =========
|
||
if __name__ == "__main__":
|
||
import argparse
|
||
p = argparse.ArgumentParser(description=PROGRAM_NAME)
|
||
p.add_argument("--period", type=int, default=300)
|
||
p.add_argument("--test-sms", action="store_true")
|
||
p.add_argument("--test-mail", action="store_true")
|
||
p.add_argument("--test-alert", action="store_true")
|
||
p.add_argument("--test-ok", action="store_true")
|
||
p.add_argument("--once", action="store_true")
|
||
args = p.parse_args()
|
||
|
||
if args.test_sms: notifier.send_sms("TEST DOMO91 (transactionnel)")
|
||
elif args.test_mail: notifier.send_email(f"[TEST {SITE}] Mail", "OK")
|
||
elif args.test_alert: notifier_sur_depassement(SITE, "Congelateur", -14.5, -15.0)
|
||
elif args.test_ok: notifier_acquittement(SITE, "Congelateur", -15.2, -15.0)
|
||
else:
|
||
if args.once: run_monitor_cycle(SITE)
|
||
else: run_monitor_loop(SITE, period_sec=args.period)
|