#!/usr/bin/env python3 # -*- coding: utf-8 -*- SITE = "Meudon" PROGRAM_NAME = f"Monitor_{SITE}" import os, re, time, ssl, smtplib, logging import datetime as dt from email.message import EmailMessage from typing import List from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv(usecwd=True), override=False) import mysql.connector from mysql.connector import Error as MySQLError try: import ovh from ovh.exceptions import APIError as OVHAPIError _ovh_available = True except Exception: ovh = None # type: ignore class OVHAPIError(Exception): pass _ovh_available = False try: import paho.mqtt.client as mqtt _mqtt_ok = True except Exception: _mqtt_ok = False log = logging.getLogger(PROGRAM_NAME.lower()) if not log.handlers: logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") def get_db(): cnx = mysql.connector.connect( host=os.getenv("DB_HOST", "localhost"), user=os.getenv("DB_USER"), password=os.getenv("DB_PASS"), database=os.getenv("DB_NAME", "Sondes"), port=int(os.getenv("DB_PORT", "3306")), autocommit=True, ) return cnx def lire_sondes_depuis_db(site: str): table = site sql = f""" SELECT t1.Sonde, t1.Temperature, t1.Date FROM `{table}` t1 JOIN ( SELECT Sonde, MAX(Date) AS MaxDate FROM `{table}` GROUP BY Sonde ) t2 ON t1.Sonde=t2.Sonde AND t1.Date=t2.MaxDate """ cnx = get_db() try: cur = cnx.cursor(dictionary=True) cur.execute(sql) rows = cur.fetchall() for r in rows: r["Temperature"] = float(r["Temperature"]) return rows except MySQLError as err: log.exception("Erreur DB (lire_sondes_depuis_db): %s", err) return [] finally: cnx.close() def lire_seuils_depuis_db(site: str): sql = """ SELECT Sonde, Temp_Max FROM Chambres_froides WHERE Lieu=%s AND Etat='ON' """ cnx = get_db() seuils = {} try: cur = cnx.cursor() cur.execute(sql, (site,)) for sonde, s in cur.fetchall(): seuils[str(sonde)] = float(s) return seuils except MySQLError as err: log.exception("Erreur DB (lire_seuils_depuis_db): %s", err) return seuils finally: cnx.close() def depassement_depuis_30min(site: str, sonde: str, seuil: float) -> bool: table = site cnx = get_db() try: cur = cnx.cursor() cur.execute(f""" SELECT Temperature, Date FROM `{table}` WHERE Sonde=%s ORDER BY Date DESC LIMIT 1 """, (sonde,)) last = cur.fetchone() if not last: return False last_temp, last_date = float(last[0]), last[1] if last_temp <= float(seuil): return False cur.execute(f""" SELECT MIN(Date) FROM `{table}` WHERE Sonde=%s AND Temperature > %s AND Date >= (NOW() - INTERVAL 120 MINUTE) """, (sonde, float(seuil))) first_over = cur.fetchone()[0] if not first_over: return False now = dt.datetime.now(tz=getattr(first_over, "tzinfo", None)) return (now - first_over) >= dt.timedelta(minutes=30) except MySQLError as err: log.exception("Erreur DB (depassement_depuis_30min): %s", err); return False finally: cnx.close() def alerte_en_cours(site: str, sonde: str) -> bool: table = f"Alertes_{site}" cnx = get_db() try: cur = cnx.cursor() cur.execute(f"SELECT 1 FROM `{table}` WHERE `Sonde`=%s AND `Etat`='En cours' LIMIT 1", (sonde,)) return cur.fetchone() is not None except MySQLError as err: log.exception("Erreur DB (alerte_en_cours): %s", err); return False finally: cnx.close() def any_alert_open(site: str) -> bool: table = f"Alertes_{site}" cnx = get_db() try: cur = cnx.cursor() cur.execute(f"SELECT 1 FROM `{table}` WHERE `Etat`='En cours' LIMIT 1") return cur.fetchone() is not None except MySQLError as err: log.exception("Erreur DB (any_alert_open): %s", err); return False finally: cnx.close() def creer_alerte(site: str, sonde: str): table = f"Alertes_{site}" cnx = get_db() try: cur = cnx.cursor() cur.execute(f"INSERT INTO `{table}` (`Sonde`, `Debut_defaut`, `Etat`) VALUES (%s, NOW(), 'En cours')", (sonde,)) cnx.commit() except MySQLError as err: log.exception("Erreur DB (creer_alerte): %s", err) finally: cnx.close() def acquitter_alerte(site: str, sonde: str): table = f"Alertes_{site}" cnx = get_db() try: cur = cnx.cursor() cur.execute(f"UPDATE `{table}` SET `Etat`='Acquitté' WHERE `Sonde`=%s AND `Etat`='En cours'", (sonde,)) cnx.commit() except MySQLError as err: log.exception("Erreur DB (acquitter_alerte): %s", err) finally: cnx.close() def _split_list(raw: str | None) -> list[str]: return [x.strip() for x in re.split(r"[;,]", raw or "") if x.strip()] def _parse_labeled_phones(raw: str | None) -> list[tuple[str, str]]: out: list[tuple[str, str]] = [] for tok in re.split(r"[;,]", raw or ""): tok = tok.strip() if not tok: continue if ":" in tok: name, num = tok.split(":", 1); out.append((name.strip(), num.strip())) else: out.append(("", tok)) return out def _resolve_sms_receivers(labeled: list[tuple[str, str]]) -> list[str]: only = os.getenv("ALERT_SMS_ONLY") if not only: return [num for (_n,num) in labeled] allow = {x.strip() for x in re.split(r"[;,]", only) if x.strip()} return [num for (name,num) in labeled if (name and name in allow) or (num in allow)] def _human_labeled_list(labeled: list[tuple[str, str]]) -> str: return ", ".join([f"{n}({p})" if n else p for n,p in labeled]) class Notifier: def __init__(self): self.ovh_enabled = _ovh_available and all( os.getenv(k) for k in ("OVH_APPLICATION_KEY","OVH_APPLICATION_SECRET","OVH_CONSUMER_KEY","OVH_SMS_SERVICE","OVH_SMS_SENDER") ) if self.ovh_enabled: self.ovh_client = ovh.Client( endpoint=os.getenv("OVH_ENDPOINT","ovh-eu"), application_key=os.getenv("OVH_APPLICATION_KEY"), application_secret=os.getenv("OVH_APPLICATION_SECRET"), consumer_key=os.getenv("OVH_CONSUMER_KEY"), ) self.ovh_service = os.getenv("OVH_SMS_SERVICE") self.ovh_sender = os.getenv("OVH_SMS_SENDER") raw_sms = (os.getenv("ALERT_SMS_TO_Meudon") or os.getenv("ALERT_SMS_TO_MEUDON") or os.getenv("ALERT_SMS_TO")) self.sms_labeled = _parse_labeled_phones(raw_sms) else: self.sms_labeled = [] self.smtp_host = os.getenv("SMTP_HOST") self.smtp_port = int(os.getenv("SMTP_PORT","465")) self.smtp_user = os.getenv("SMTP_USER") self.smtp_pass = os.getenv("SMTP_PASS") self.smtp_security = (os.getenv("SMTP_SECURITY","SSL") or "SSL").upper() raw_mail_to = (os.getenv("MAIL_TO_Meudon") or os.getenv("MAIL_TO_MEUDON") or os.getenv("MAIL_TO") or "") self.mail_to = _split_list(raw_mail_to) self.mail_from = (os.getenv("MAIL_FROM_Meudon") or os.getenv("MAIL_FROM_MEUDON") or os.getenv("MAIL_FROM") or self.smtp_user) self.smtp_enabled = all([self.smtp_host, self.smtp_port, self.smtp_user, self.smtp_pass, self.mail_to]) def send_sms(self, message: str, tag: str = f"monitor-{SITE.lower()}") -> bool: if not self.ovh_enabled or not self.sms_labeled: log.warning("SMS désactivé ou aucun destinataire."); return False receivers = _resolve_sms_receivers(self.sms_labeled) if not receivers: log.warning("ALERT_SMS_ONLY filtre tous les destinataires."); return False payload = { "sender": self.ovh_sender, "receivers": receivers, "message": message[:1600], "priority": "high", "coding": "7bit", "class": "phoneDisplay", "noStopClause": True, "senderForResponse": False, "validityPeriod": 2880, "tag": tag, } try: log.info("Envoi SMS vers: %s", _human_labeled_list([(n,p) for (n,p) in self.sms_labeled if p in receivers])) resp = self.ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload) ids = resp.get("ids") or []; log.info("SMS OVH envoyé (job ids=%s)", ids) try: if ids: job_id = ids[0] for _ in range(3): job = self.ovh_client.get(f"/sms/{self.ovh_service}/jobs/{job_id}") if job.get("status") in ("done","error","cancelled"): log.info("Statut job SMS: %s", job.get("status")); break time.sleep(1.5) except Exception as e: log.debug("Suivi job OVH indisponible (OK): %s", e) return True except OVHAPIError as err: log.exception("Erreur API OVH: %s", err); return False except Exception as err: log.exception("Echec envoi SMS OVH: %s", err); return False def send_email(self, subject: str, body: str) -> bool: if not self.smtp_enabled: log.warning("SMTP non configuré, email non envoyé."); return False msg = EmailMessage(); msg["From"]=self.mail_from; msg["To"]=", ".join(self.mail_to); msg["Subject"]=subject; msg.set_content(body) timeout = int(os.getenv("SMTP_TIMEOUT","60")); debug = os.getenv("SMTP_DEBUG","0")=="1" def _send_ssl(): with smtplib.SMTP_SSL(self.smtp_host,465,context=ssl.create_default_context(),timeout=timeout) as s: if debug: s.set_debuglevel(1); s.login(self.smtp_user,self.smtp_pass); s.send_message(msg) def _send_starttls(): with smtplib.SMTP(self.smtp_host,self.smtp_port,timeout=timeout) as s: if debug: s.set_debuglevel(1); s.ehlo(); s.starttls(context=ssl.create_default_context()); s.ehlo() s.login(self.smtp_user,self.smtp_pass); s.send_message(msg) try: if self.smtp_security=="STARTTLS": try: _send_starttls() except (smtplib.SMTPServerDisconnected, TimeoutError, smtplib.SMTPConnectError) as err: log.warning("STARTTLS/587 a échoué (%s). Tentative SSL/465...", err); _send_ssl() else: _send_ssl() log.info("Email envoyé à %s", self.mail_to); return True except (smtplib.SMTPException, ssl.SSLError, TimeoutError) as err: log.exception("Erreur SMTP: %s", err); return False except Exception as err: log.exception("Echec envoi email: %s", err); return False from zoneinfo import ZoneInfo PARIS = ZoneInfo("Europe/Paris") def fmt_deg(v: float) -> str: s=f"{float(v):.1f}".replace(".", ","); return f"{s}°C" def now_paris()->dt.datetime: return dt.datetime.now(tz=PARIS) def build_alert_text(site,sonde,temp,seuil,when=None): when=when or now_paris(); subject=f"[ALERTE {site}] {sonde} au-dessus du seuil" lines=[subject+":",f"Sonde: {sonde}",f"Température: {fmt_deg(temp)} (seuil {fmt_deg(seuil)})",f"Site: {site}",f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}"] txt="\n".join(lines); return subject,txt,txt def build_ok_text(site,sonde,temp,seuil,when=None): when=when or now_paris(); subject=f"[OK {site}] {sonde} revenue normale" lines=[subject+":",f"Sonde: {sonde}",f"Température: {fmt_deg(temp)} <= seuil {fmt_deg(seuil)}",f"Site: {site}",f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}"] txt="\n".join(lines); return subject,txt,txt # ========= Gyrophare MQTT ========= class MQTTPublisher: def __init__(self, site: str): self.enabled = (_mqtt_ok and (os.getenv("GYRO_MODE", "").lower() == "mqtt")) self.site = site self.topic = (os.getenv(f"GYRO_MQTT_TOPIC_{site}") or os.getenv(f"GYRO_MQTT_TOPIC_{site.capitalize()}")) self.last_state: bool | None = None if not self.enabled: log.info("Gyro MQTT désactivé (GYRO_MODE != mqtt ou paho-mqtt absent).") return if not self.topic: log.warning("Topic MQTT manquant pour %s (GYRO_MQTT_TOPIC_%s)", site, site) self.enabled = False return host = os.getenv("MQTT_HOST", "localhost") port = int(os.getenv("MQTT_PORT", "1883")) user = os.getenv("MQTT_USER") pwd = os.getenv("MQTT_PASS") tls = (os.getenv("MQTT_TLS", "0") == "1") # --- Création du client MQTT : compatible paho 1.x et 2.x --- cbver = getattr(mqtt, "CallbackAPIVersion", None) if cbver is not None: # paho >= 2.x : on choisit la meilleure constante disponible api_v = ( getattr(cbver, "VERSION2", None) # paho 2.x or getattr(cbver, "V5", None) # certaines builds or getattr(cbver, "v5", None) # fallback or getattr(cbver, "V311", None) # dernier recours ) try: self.client = mqtt.Client(callback_api_version=api_v) if api_v else mqtt.Client() except TypeError: # vieux paho ne supporte pas l’argument callback_api_version self.client = mqtt.Client() else: # paho 1.x self.client = mqtt.Client() # ------------------------------------------------------------ if user and pwd: self.client.username_pw_set(user, pwd) if tls: self.client.tls_set() try: self.client.connect(host, port, keepalive=30) self.client.loop_start() log.info("MQTT connecté (%s:%s), topic=%s", host, port, self.topic) except Exception as e: log.exception("MQTT connexion impossible: %s", e) self.enabled = False def set(self, on: bool): if not self.enabled: return if self.last_state is not None and self.last_state == on: return payload = "on" if on else "off" try: r = self.client.publish(self.topic, payload=payload, qos=1, retain=True) r.wait_for_publish(timeout=3) if r.rc != 0: log.warning("MQTT publish rc=%s (topic=%s)", r.rc, self.topic) else: log.info("Gyro %s -> %s (MQTT)", self.site, payload.upper()) self.last_state = on except Exception as e: log.exception("MQTT publish erreur: %s", e) notifier = Notifier() beacon = MQTTPublisher(SITE) def notifier_sur_depassement(site,sonde,temp,seuil): subject,sms_text,email_body=build_alert_text(site,sonde,temp,seuil) notifier.send_sms(sms_text); notifier.send_email(subject,email_body) try: beacon.set(True) except Exception: pass def notifier_acquittement(site,sonde,temp,seuil): subject,sms_text,_=build_ok_text(site,sonde,temp,seuil) notifier.send_sms(sms_text) try: if not any_alert_open(site): beacon.set(False) except Exception: pass def run_monitor_cycle(site: str = SITE): sondes=lire_sondes_depuis_db(site); seuils=lire_seuils_depuis_db(site) for r in sondes: nom=str(r["Sonde"]); temp=float(r["Temperature"]); seuil=float(seuils.get(nom,6.0)) if temp>seuil: if depassement_depuis_30min(site,nom,seuil) and not alerte_en_cours(site,nom): creer_alerte(site,nom); notifier_sur_depassement(site,nom,temp,seuil) else: if alerte_en_cours(site,nom): acquitter_alerte(site,nom); notifier_acquittement(site,nom,temp,seuil) def run_monitor_loop(site: str = SITE, period_sec: int = 300): log.info("%s démarré (site=%s, période=%ss) ✅", PROGRAM_NAME, site, period_sec) while True: t0=time.time() try: run_monitor_cycle(site) except Exception as err: log.exception("Erreur cycle monitoring: %s",err) time.sleep(max(0, period_sec-(time.time()-t0))) if __name__=="__main__": import argparse p=argparse.ArgumentParser(description=PROGRAM_NAME) p.add_argument("--period",type=int,default=300) p.add_argument("--test-sms",action="store_true") p.add_argument("--test-mail",action="store_true") p.add_argument("--test-alert",action="store_true") p.add_argument("--test-ok",action="store_true") p.add_argument("--once",action="store_true") args=p.parse_args() if args.test_sms: notifier.send_sms("TEST DOMO91 (transactionnel)") elif args.test_mail: notifier.send_email(f"[TEST {SITE}] Mail","OK") elif args.test_alert: notifier_sur_depassement(SITE,"Congelateur",-14.5,-15.0) elif args.test_ok: notifier_acquittement(SITE,"Congelateur",-15.2,-15.0) else: if args.once: run_monitor_cycle(SITE) else: run_monitor_loop(SITE,period_sec=args.period)