Ajout de chat a Monitor_Saclay

This commit is contained in:
2026-04-20 09:36:11 +02:00
parent 5f9d1c0911
commit f1203012df
7 changed files with 1520 additions and 272 deletions

4
.env
View File

@@ -11,6 +11,10 @@ MQTT_USER=sondes
MQTT_PASS=3J@bjYP0
MQTT_PORT_MEUDON=1883
#Synology Chat
SYNOLOGY_CHAT_WEBHOOK_URL=https://192.168.1.250/webapi/entry.cgi?api=SYNO.Chat.External&method=incoming&version=2&token=UN7nhD70vrhrHFh1VeDdOpsklIHiIFRop2qB7b6YusMEY3clY3R8CXe4hFzz4KKc
SYNOLOGY_CHAT_VERIFY_SSL=false
# Boucle rapide du gyro
GYRO_WINDOW_MIN=3
GYRO_NEEDED_POINTS=2

Binary file not shown.

View File

@@ -7,6 +7,8 @@ PROGRAM_NAME = f"Monitor_{SITE}"
# ========= Imports & .env =========
import os, re, time, ssl, smtplib, logging
from typing import Any, cast
import requests
import datetime as dt
from email.message import EmailMessage
from datetime import datetime
@@ -14,6 +16,7 @@ from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv(usecwd=True), override=False)
from utils_sms import normaliser_sms
def _env_bool(name: str, default: bool) -> bool:
v = os.getenv(name, str(int(default))).strip().lower()
return v in ("1", "true", "yes", "on")
@@ -42,7 +45,7 @@ except Exception:
_mqtt_ok = False
# ========= Logger =========
level = getattr(logging, os.getenv("LOGLEVEL", "INFO").upper(), logging.INFO)
level = getattr(logging, (os.getenv("LOGLEVEL", "INFO") or "INFO").upper(), logging.INFO)
log = logging.getLogger(PROGRAM_NAME.lower())
if not log.handlers:
logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s")
@@ -86,6 +89,14 @@ def close_alert(conn, table_alertes: str, sonde: str) -> bool:
cur.close()
return changed
def _to_float(value: Any) -> float:
return float(cast(Any, value))
def _to_datetime(value: Any) -> datetime:
if isinstance(value, datetime):
return value
raise TypeError(f"datetime attendu, reçu: {type(value)!r}")
def get_db():
return mysql.connector.connect(
host=os.getenv("DB_HOST"),
@@ -107,7 +118,7 @@ def insert_gyro_log(lieu: str, etat: str, topic: str, payload_raw: str,
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s)",
(
lieu,
os.getenv("GYRO_SONDE_NAME", "Gyro"),
_env_str("GYRO_SONDE_NAME", "Gyro"),
etat, # 'ON' ou 'OFF'
when.strftime('%Y-%m-%d %H:%M:%S'),
topic,
@@ -135,7 +146,7 @@ def should_insert_gyro(lieu: str, etat: str, sonde: str = "Gyro") -> bool:
cnx.close()
# --- Lecture des dernières mesures de température (en ignorant lignes d'état) ---
def lire_sondes_depuis_db(site: str):
def lire_sondes_depuis_db(site: str) -> list[dict[str, Any]]:
table = site
sql = f"""
SELECT t1.Sonde, t1.Temperature, t1.Date
@@ -152,9 +163,9 @@ def lire_sondes_depuis_db(site: str):
try:
cur = cnx.cursor(dictionary=True)
cur.execute(sql)
rows = cur.fetchall()
rows = cast(list[dict[str, Any]], cur.fetchall())
for r in rows:
r["Temperature"] = float(r["Temperature"]) # garanti NOT NULL
r["Temperature"] = float(r["Temperature"])
return rows
except MySQLError as err:
log.exception("Erreur DB (lire_sondes_depuis_db): %s", err)
@@ -163,19 +174,19 @@ def lire_sondes_depuis_db(site: str):
cnx.close()
def lire_cfg_chambres(site: str):
def lire_cfg_chambres(site: str) -> dict[str, dict[str, float | bool]]:
"""
Retourne {sonde: {"temp_max": float, "active": bool}}
depuis Chambres_froides pour le site.
"""
dbname = os.getenv("DB_NAME", "Sondes")
dbname = _env_str("DB_NAME", "Sondes")
sql = f"""
SELECT Sonde, Temp_Max, Etat
FROM `{dbname}`.`Chambres_froides`
WHERE Lieu=%s
"""
cnx = get_db()
cfg: dict[str, dict] = {}
cfg: dict[str, dict[str, float | bool]] = {}
try:
cur = cnx.cursor()
cur.execute(sql, (site,))
@@ -191,7 +202,7 @@ def lire_cfg_chambres(site: str):
finally:
cnx.close()
def compute_site_alarm(last_values: list[dict], cfg: dict[str, dict], hysteresis: float = 0.0):
def compute_site_alarm(last_values: list[dict[str, Any]], cfg: dict[str, dict[str, float | bool]], hysteresis: float = 0.0) -> tuple[bool, tuple[str, float, float] | None]:
"""
Retourne (is_on: bool, trigger: (sonde,temp,seuil) | None)
"""
@@ -200,20 +211,20 @@ def compute_site_alarm(last_values: list[dict], cfg: dict[str, dict], hysteresis
meta = cfg.get(sonde)
if not meta or not meta.get("active", False):
continue
temp = float(row["Temperature"])
seuil = float(meta["temp_max"])
temp = _to_float(row["Temperature"])
seuil = _to_float(meta["temp_max"])
if temp > seuil + float(hysteresis):
return True, (sonde, temp, seuil)
return False, None
def lire_seuils_depuis_db(site: str):
def lire_seuils_depuis_db(site: str) -> dict[str, float]:
sql = """
SELECT Sonde, Temp_Max
FROM Sondes.Chambres_froides
WHERE Lieu=%s AND Etat='ON'
"""
cnx = get_db()
seuils = {}
seuils: dict[str, float] = {}
try:
cur = cnx.cursor()
cur.execute(sql, (site, ))
@@ -233,8 +244,8 @@ def depassement_depuis_30min(site: str, sonde: str, seuil: float) -> bool:
CONT_MIN = ALERT_CONTINUOUS_MINUTES (defaut 30)
LOOKBACK = ALERT_LOOKBACK_MINUTES (defaut max(60, CONT_MIN*3))
"""
CONT_MIN = int(os.getenv("ALERT_CONTINUOUS_MINUTES", "30"))
LOOKBACK = int(os.getenv("ALERT_LOOKBACK_MINUTES", str(max(60, int(os.getenv("ALERT_CONTINUOUS_MINUTES", "30"))*3))))
cont_min = int(_env_str("ALERT_CONTINUOUS_MINUTES", "30"))
lookback = int(_env_str("ALERT_LOOKBACK_MINUTES", str(max(60, int(_env_str("ALERT_CONTINUOUS_MINUTES", "30"))*3))))
table = site
cnx = get_db()
@@ -246,20 +257,22 @@ def depassement_depuis_30min(site: str, sonde: str, seuil: float) -> bool:
WHERE Sonde=%s
AND Date >= (NOW() - INTERVAL %s MINUTE)
ORDER BY Date DESC
""", (sonde, LOOKBACK))
""", (sonde, lookback))
rows = cur.fetchall()
if not rows:
return False
last_temp, last_dt = float(rows[0][0]), rows[0][1]
first_row = cast(tuple[Any, Any], rows[0])
last_temp = _to_float(first_row[0])
last_dt = _to_datetime(first_row[1])
if last_temp <= float(seuil):
return False
# Début de la séquence continue > seuil
start_dt = last_dt
for temp, d in rows[1:]:
if float(temp) > float(seuil):
start_dt = d
for temp, row_dt in rows[1:]:
if _to_float(temp) > float(seuil):
start_dt = _to_datetime(row_dt)
else:
break
@@ -267,8 +280,8 @@ def depassement_depuis_30min(site: str, sonde: str, seuil: float) -> bool:
now = dt.datetime.now(tz=tzinfo)
dur_min = (now - start_dt).total_seconds() / 60.0
log.debug("Seq>seuil %s: start=%s, now=%s, dur=%.1fmin, need>=%d",
sonde, start_dt, now, dur_min, CONT_MIN)
return dur_min >= CONT_MIN
sonde, start_dt, now, dur_min, cont_min)
return dur_min >= cont_min
except MySQLError as err:
log.exception("Erreur DB (depassement_depuis_30min, continu): %s", err)
@@ -294,57 +307,110 @@ def _parse_labeled_phones(raw: str | None) -> list[tuple[str, str]]:
return out
def _resolve_sms_receivers(labeled: list[tuple[str, str]]) -> list[str]:
only = os.getenv("ALERT_SMS_ONLY")
only = _env_str("ALERT_SMS_ONLY")
if not only:
return [num for (_n, num) in labeled]
allow = {x.strip() for x in re.split(r"[;,]", only) if x.strip()}
return [num for (name, num) in labeled if (name and name in allow) or (num in allow)]
def _human_labeled_list(labeled: list[tuple[str, str]]) -> str:
return ", ".join([f"{n}({p})" if n else p for n, p in labeled])
return ", ".join([f"{name}({phone})" if name else phone for name, phone in labeled])
# ========= Synology Chat =========
def _env_str(name: str, default: str = "") -> str:
return (os.getenv(name, default) or "").strip()
def synology_chat_enabled() -> bool:
return bool(
_env_str(f"SYNO_CHAT_WEBHOOK_{SITE}") or
_env_str(f"SYNO_CHAT_WEBHOOK_{SITE.upper()}") or
_env_str("SYNO_CHAT_WEBHOOK")
)
def send_synology_chat(message: str, *, username: str | None = None) -> bool:
"""
Envoie un message sur Synology Chat via webhook entrant.
Variables supportées :
- SYNO_CHAT_WEBHOOK_{SITE} ou SYNO_CHAT_WEBHOOK_{SITE.upper()}
- SYNO_CHAT_WEBHOOK
- SYNO_CHAT_BOTNAME (optionnel)
- SYNO_CHAT_TIMEOUT (optionnel, défaut 10s)
"""
webhook = (
_env_str(f"SYNO_CHAT_WEBHOOK_{SITE}") or
_env_str(f"SYNO_CHAT_WEBHOOK_{SITE.upper()}") or
_env_str("SYNO_CHAT_WEBHOOK")
)
if not webhook:
log.info("Synology Chat non configuré.")
return False
payload = {"text": message}
botname = username or _env_str("SYNO_CHAT_BOTNAME")
if botname:
payload["username"] = botname
timeout = int(_env_str("SYNO_CHAT_TIMEOUT", "10"))
try:
r = requests.post(webhook, json=payload, timeout=timeout)
r.raise_for_status()
# Certains webhooks Synology répondent "ok" ou JSON {success:true}
body = (r.text or "").strip().lower()
if body and body not in ("ok", '{"success":true}'):
log.info("Réponse Synology Chat: %s", r.text[:200])
log.info("Notification Synology Chat envoyée.")
return True
except requests.RequestException as e:
log.exception("Echec envoi Synology Chat: %s", e)
return False
# ========= Notifier (SMS interne + SMS client + Mail) =========
class Notifier:
def __init__(self):
# OVH SMS
self.ovh_enabled = _ovh_available and all(
os.getenv(k) for k in ("OVH_APPLICATION_KEY","OVH_APPLICATION_SECRET","OVH_CONSUMER_KEY","OVH_SMS_SERVICE","OVH_SMS_SENDER")
_env_str(k) for k in ("OVH_APPLICATION_KEY","OVH_APPLICATION_SECRET","OVH_CONSUMER_KEY","OVH_SMS_SERVICE","OVH_SMS_SENDER")
)
self.ovh_client: Any | None = None
self.ovh_service: str = ""
self.ovh_sender: str = ""
if self.ovh_enabled:
assert ovh is not None
self.ovh_client = ovh.Client(
endpoint=os.getenv("OVH_ENDPOINT","ovh-eu"),
application_key=os.getenv("OVH_APPLICATION_KEY"),
application_secret=os.getenv("OVH_APPLICATION_SECRET"),
consumer_key=os.getenv("OVH_CONSUMER_KEY"),
endpoint=_env_str("OVH_ENDPOINT", "ovh-eu"),
application_key=_env_str("OVH_APPLICATION_KEY"),
application_secret=_env_str("OVH_APPLICATION_SECRET"),
consumer_key=_env_str("OVH_CONSUMER_KEY"),
)
self.ovh_service = os.getenv("OVH_SMS_SERVICE")
self.ovh_sender = os.getenv("OVH_SMS_SENDER")
raw_sms = (os.getenv(f"ALERT_SMS_TO_{SITE}") or os.getenv(f"ALERT_SMS_TO_{SITE.upper()}") or os.getenv("ALERT_SMS_TO"))
self.ovh_service = _env_str("OVH_SMS_SERVICE")
self.ovh_sender = _env_str("OVH_SMS_SENDER")
raw_sms = (_env_str(f"ALERT_SMS_TO_{SITE}") or _env_str(f"ALERT_SMS_TO_{SITE.upper()}") or _env_str("ALERT_SMS_TO"))
self.sms_labeled = _parse_labeled_phones(raw_sms)
else:
self.sms_labeled = []
# SMS CLIENTS (site-spécifique + génériques + compat FR)
raw_sms_client = (
os.getenv(f"ALERT_SMS_CLIENT_TO_{SITE}") or
os.getenv(f"ALERT_SMS_CLIENT_TO_{SITE.upper()}") or
os.getenv("ALERT_SMS_CLIENT_TO") or
os.getenv(f"ALERTE_CLIENT_{SITE}") or
os.getenv("ALERTE_CLIENT")
_env_str(f"ALERT_SMS_CLIENT_TO_{SITE}") or
_env_str(f"ALERT_SMS_CLIENT_TO_{SITE.upper()}") or
_env_str("ALERT_SMS_CLIENT_TO") or
_env_str(f"ALERTE_CLIENT_{SITE}") or
_env_str("ALERTE_CLIENT")
)
self.sms_client_labeled = _parse_labeled_phones(raw_sms_client)
self.sms_client_enabled = (os.getenv("ALERT_SMS_CLIENT_ENABLED", "1") == "1")
self.sms_client_enabled = (_env_str("ALERT_SMS_CLIENT_ENABLED", "1") == "1")
# SMTP
self.smtp_host = os.getenv("SMTP_HOST")
self.smtp_port = int(os.getenv("SMTP_PORT","465"))
self.smtp_user = os.getenv("SMTP_USER")
self.smtp_pass = os.getenv("SMTP_PASS")
self.smtp_security = (os.getenv("SMTP_SECURITY","SSL") or "SSL").upper()
self.smtp_host: str = _env_str("SMTP_HOST")
self.smtp_port = int(_env_str("SMTP_PORT", "465"))
self.smtp_user: str = _env_str("SMTP_USER")
self.smtp_pass: str = _env_str("SMTP_PASS")
self.smtp_security: str = _env_str("SMTP_SECURITY", "SSL").upper()
raw_mail_to = (os.getenv(f"MAIL_TO_{SITE}") or os.getenv(f"MAIL_TO_{SITE.upper()}") or os.getenv("MAIL_TO") or "")
raw_mail_to = (_env_str(f"MAIL_TO_{SITE}") or _env_str(f"MAIL_TO_{SITE.upper()}") or _env_str("MAIL_TO"))
self.mail_to = _split_list(raw_mail_to)
self.mail_from = (os.getenv(f"MAIL_FROM_{SITE}") or os.getenv(f"MAIL_FROM_{SITE.upper()}") or os.getenv("MAIL_FROM") or self.smtp_user)
self.mail_from: str = (_env_str(f"MAIL_FROM_{SITE}") or _env_str(f"MAIL_FROM_{SITE.upper()}") or _env_str("MAIL_FROM") or self.smtp_user)
self.smtp_enabled = all([self.smtp_host, self.smtp_port, self.smtp_user, self.smtp_pass, self.mail_to])
@@ -373,15 +439,16 @@ class Notifier:
"tag": tag,
}
try:
log.info("Envoi SMS vers: %s", _human_labeled_list([(n,p) for (n,p) in self.sms_labeled if p in receivers]))
resp = self.ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload)
ovh_client = cast(Any, self.ovh_client)
log.info("Envoi SMS vers: %s", _human_labeled_list([(name, phone) for (name, phone) in self.sms_labeled if phone in receivers]))
resp = ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload)
ids = resp.get("ids") or []
log.info("SMS OVH envoyé (job ids=%s)", ids)
try:
if ids:
job_id = ids[0]
for _ in range(3):
job = self.ovh_client.get(f"/sms/{self.ovh_service}/jobs/{job_id}")
job = ovh_client.get(f"/sms/{self.ovh_service}/jobs/{job_id}")
if job.get("status") in ("done","error","cancelled"):
log.info("Statut job SMS: %s", job.get("status")); break
time.sleep(1.5)
@@ -399,7 +466,7 @@ class Notifier:
if not self.sms_client_enabled or not self.sms_client_labeled:
log.info("SMS client: désactivé ou aucun destinataire."); return False
only = os.getenv("ALERT_SMS_CLIENT_ONLY")
only = _env_str("ALERT_SMS_CLIENT_ONLY")
if only:
allow = {x.strip() for x in re.split(r"[;,]", only) if x.strip()}
labeled = [(n, p) for (n, p) in self.sms_client_labeled if (n and n in allow) or (p in allow)]
@@ -426,7 +493,8 @@ class Notifier:
}
try:
log.info("Envoi SMS CLIENT vers: %s", _human_labeled_list(labeled))
resp = self.ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload)
ovh_client = cast(Any, self.ovh_client)
resp = ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload)
log.info("SMS CLIENT OVH envoyé (job ids=%s)", resp.get("ids"))
return True
except Exception as err:
@@ -442,8 +510,8 @@ class Notifier:
msg["Subject"] = subject
msg.set_content(body)
timeout = int(os.getenv("SMTP_TIMEOUT","60"))
debug = os.getenv("SMTP_DEBUG","0") == "1"
timeout = int(_env_str("SMTP_TIMEOUT", "60"))
debug = _env_str("SMTP_DEBUG", "0") == "1"
def _send_ssl():
with smtplib.SMTP_SSL(self.smtp_host, 465, context=ssl.create_default_context(), timeout=timeout) as server:
@@ -484,46 +552,64 @@ def fmt_deg(v: float) -> str:
def now_paris() -> dt.datetime:
return dt.datetime.now(tz=PARIS)
def build_alert_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None):
when = when or now_paris()
def build_alert_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None) -> tuple[str, str, str]:
when_dt = when if when is not None else now_paris()
subject = f"[ALERTE {site}] {sonde} au-dessus du seuil"
lines = [
subject + ":",
f"Sonde: {sonde}",
f"Température: {fmt_deg(temp)} (seuil {fmt_deg(seuil)})",
f"Site: {site}",
f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}"
f"Heure: {when_dt.strftime('%Y-%m-%d %H:%M:%S')}"
]
txt = "\n".join(lines)
return subject, txt, txt
def build_ok_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None):
when = when or now_paris()
def build_ok_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None) -> tuple[str, str, str]:
when_dt = when if when is not None else now_paris()
subject = f"[OK {site}] {sonde} revenue normale"
lines = [
subject + ":",
f"Sonde: {sonde}",
f"Température: {fmt_deg(temp)} <= seuil {fmt_deg(seuil)}",
f"Site: {site}",
f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}"
f"Heure: {when_dt.strftime('%Y-%m-%d %H:%M:%S')}"
]
txt = "\n".join(lines)
return subject, txt, txt
def build_client_alert_sms(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None) -> str:
when = when or now_paris()
when_dt = when if when is not None else now_paris()
# Court, 1 ligne; accents/° nettoyés par normaliser_sms
return f"ALERTE CLIENT {sonde}: T={fmt_deg(temp)} > S={fmt_deg(seuil)} H:{when.strftime('%H:%M')}"
return f"ALERTE CLIENT {sonde}: T={fmt_deg(temp)} > S={fmt_deg(seuil)} H:{when_dt.strftime('%H:%M')}"
def build_gyro_chat_alert(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None) -> str:
when_dt = when if when is not None else now_paris()
return (
f":rotating_light: [{site}] GYRO DECLENCHE\n"
f"Sonde: {sonde}\n"
f"Temperature: {fmt_deg(temp)} > seuil {fmt_deg(seuil)}\n"
f"Heure: {when_dt.strftime('%Y-%m-%d %H:%M:%S')}"
)
def build_gyro_chat_ok(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None) -> str:
when_dt = when if when is not None else now_paris()
return (
f":white_check_mark: [{site}] GYRO RETOUR NORMALE\n"
f"Sonde: {sonde}\n"
f"Temperature: {fmt_deg(temp)} <= seuil {fmt_deg(seuil)}\n"
f"Heure: {when_dt.strftime('%Y-%m-%d %H:%M:%S')}"
)
# ========= Gyrophare MQTT =========
class MQTTPublisher:
def __init__(self, site: str):
self.enabled = bool(_mqtt_ok)
self.site = site
self.topic = (
os.getenv(f"GYRO_MQTT_TOPIC_{site}") or
os.getenv(f"GYRO_MQTT_TOPIC_{site.upper()}") or
os.getenv("GYRO_MQTT_TOPIC") or
self.topic: str = (
_env_str(f"GYRO_MQTT_TOPIC_{site}") or
_env_str(f"GYRO_MQTT_TOPIC_{site.upper()}") or
_env_str("GYRO_MQTT_TOPIC") or
f"Sondes/{site}/Gyro/cmd"
)
self.last_state: bool | None = None
@@ -536,27 +622,17 @@ class MQTTPublisher:
self.enabled = False
return
host = os.getenv("MQTT_HOST", "localhost")
port = int(os.getenv("MQTT_PORT", "1883"))
user = os.getenv("MQTT_USER")
pwd = os.getenv("MQTT_PASS")
tls = (os.getenv("MQTT_TLS", "0") == "1")
host = _env_str("MQTT_HOST", "localhost")
port = int(_env_str("MQTT_PORT", "1883"))
user = _env_str("MQTT_USER")
pwd = _env_str("MQTT_PASS")
tls = (_env_str("MQTT_TLS", "0") == "1")
# --- Création du client MQTT : compatible paho 1.x et 2.x ---
cbver = getattr(mqtt, "CallbackAPIVersion", None)
if cbver is not None:
api_v = (
getattr(cbver, "VERSION2", None)
or getattr(cbver, "V5", None)
or getattr(cbver, "v5", None)
or getattr(cbver, "V311", None)
)
try:
self.client = mqtt.Client(callback_api_version=api_v) if api_v else mqtt.Client()
except TypeError:
self.client = mqtt.Client()
else:
try:
self.client = mqtt.Client()
except TypeError:
self.client = mqtt.Client(client_id="")
# ------------------------------------------------------------
if user and pwd:
@@ -572,10 +648,9 @@ class MQTTPublisher:
# Abonnements (depuis env ou valeurs par défaut raisonnables)
subs_env = (
os.getenv(f"GYRO_MQTT_SUB_{site}") or
os.getenv(f"GYRO_MQTT_SUB_{site.upper()}") or
os.getenv("GYRO_MQTT_SUB") or
""
_env_str(f"GYRO_MQTT_SUB_{site}") or
_env_str(f"GYRO_MQTT_SUB_{site.upper()}") or
_env_str("GYRO_MQTT_SUB")
)
subs = [t.strip() for t in subs_env.split(",") if t.strip()]
if not subs:
@@ -599,9 +674,9 @@ class MQTTPublisher:
self.enabled = False
# --- Callback réception MQTT ---
def _on_message(self, client, userdata, msg):
def _on_message(self, _client, _userdata, msg):
lieu = self.site
topic = msg.topic
topic = str(msg.topic)
payload_raw = msg.payload.decode(errors="ignore").strip()
upper = payload_raw.upper()
@@ -681,10 +756,10 @@ class GyroPulseController:
- SMS OK immédiat à lextinction (activé par défaut)
"""
def __init__(self, site: str, beacon, notifier, *,
check_sec: int = int(os.getenv("GYRO_CHECK_SEC", "20")),
pulse_sec: int = int(os.getenv("GYRO_PULSE_SEC", "60")),
cooldown_sec: int = int(os.getenv("GYRO_COOLDOWN_SEC", "600")),
normal_confirm: int = int(os.getenv("GYRO_NORMAL_CONFIRM", "2"))):
check_sec: int = int(_env_str("GYRO_CHECK_SEC", "20")),
pulse_sec: int = int(_env_str("GYRO_PULSE_SEC", "60")),
cooldown_sec: int = int(_env_str("GYRO_COOLDOWN_SEC", "600")),
normal_confirm: int = int(_env_str("GYRO_NORMAL_CONFIRM", "2"))):
self.site = site
self.beacon = beacon
self.notifier = notifier
@@ -698,13 +773,13 @@ class GyroPulseController:
self._t_cooldown_end = 0.0
self._normal_count = 0
self._stop = threading.Event()
self._thread = None
self._current = None # dernier état effectif
self._thread: threading.Thread | None = None
self._current: bool | None = None # dernier état effectif
# Anti-spam SMS & SMS OK activé par défaut
self._last_sms: dict[str, float] = {} # {sonde: ts dernier envoi}
self._sms_min_sec = int(os.getenv("ALERT_SMS_COOLDOWN_SEC") or os.getenv("GYRO_SMS_MIN_SEC", "120"))
self._send_ok = (os.getenv("ALERT_OK_SMS_GYRO", "1") == "1")
self._sms_min_sec = int(_env_str("ALERT_SMS_COOLDOWN_SEC") or _env_str("GYRO_SMS_MIN_SEC", "120"))
self._send_ok = (_env_str("ALERT_OK_SMS_GYRO", "1") == "1")
# Conserver le dernier déclencheur (pour SMS OK)
self._last_trigger: tuple[str, float, float] | None = None # (sonde, temp, seuil)
@@ -735,18 +810,20 @@ class GyroPulseController:
return False
def _send_alert_sms(self, trigger: tuple[str, float, float] | None):
if not _env_bool("ALERT_INTERNAL_SMS_ENABLED", True):
return
if not trigger:
return
sonde, temp, seuil = trigger
if self._sms_can_send(sonde):
# Notification Synology Chat immediate sur declenchement Gyro
if _env_bool("SYNO_CHAT_GYRO_ENABLED", True):
chat_msg = build_gyro_chat_alert(self.site, sonde, temp, seuil, when=now_paris())
send_synology_chat(chat_msg)
if _env_bool("ALERT_INTERNAL_SMS_ENABLED", True) and self._sms_can_send(sonde):
_, sms_text, _ = build_alert_text(self.site, sonde, temp, seuil, when=now_paris())
self.notifier.send_sms(sms_text)
def _send_ok_sms_from_last_trigger(self):
if not _env_bool("ALERT_OK_SMS_GYRO", True):
return
if not self._send_ok or not self._last_trigger:
return
sonde, _temp_prev, seuil = self._last_trigger
@@ -760,7 +837,11 @@ class GyroPulseController:
if curr_temp is None:
curr_temp = seuil - 0.1 # fallback léger
if self._sms_can_send(sonde):
if _env_bool("SYNO_CHAT_GYRO_ENABLED", True):
chat_msg = build_gyro_chat_ok(self.site, sonde, curr_temp, seuil, when=now_paris())
send_synology_chat(chat_msg)
if _env_bool("ALERT_OK_SMS_GYRO", True) and self._sms_can_send(sonde):
_, sms_text, _ = build_ok_text(self.site, sonde, curr_temp, seuil, when=now_paris())
self.notifier.send_sms(sms_text)
@@ -769,7 +850,7 @@ class GyroPulseController:
def _is_alarm_now(self) -> tuple[bool, tuple[str, float, float] | None]:
last_rows = lire_sondes_depuis_db(self.site)
cfg = lire_cfg_chambres(self.site)
return compute_site_alarm(last_rows, cfg, hysteresis=float(os.getenv("GYRO_HYSTERESIS", "0.0")))
return compute_site_alarm(last_rows, cfg, hysteresis=float(_env_str("GYRO_HYSTERESIS", "0.0")))
def _run(self):
while not self._stop.is_set():
@@ -790,9 +871,9 @@ class GyroPulseController:
if trigger:
s, t, se = trigger
log.info("Gyro → ON déclenché par %s: %.2f > %.2f (mode %s)",
s, t, se, "CONTINU" if os.getenv("GYRO_MODE_CONTINUOUS", "1") == "1" else "PULSE")
s, t, se, "CONTINU" if _env_str("GYRO_MODE_CONTINUOUS", "1") == "1" else "PULSE")
# SMS alerte immédiat (optionnel)
if os.getenv("ALERT_INTERNAL_SMS_ENABLED", "0") == "1":
if _env_str("ALERT_INTERNAL_SMS_ENABLED", "0") == "1":
self._send_alert_sms(trigger)
elif self.state == _GyroState.PULSE_ON:
@@ -804,11 +885,11 @@ class GyroPulseController:
self._normal_count = 0
log.info("Gyro → OFF (retour à la normale confirmé)")
# SMS OK immédiat
if os.getenv("ALERT_OK_SMS_GYRO", "0") == "1":
if _env_str("ALERT_OK_SMS_GYRO", "0") == "1":
self._send_ok_sms_from_last_trigger()
else:
self._normal_count = 0
if os.getenv("GYRO_MODE_CONTINUOUS", "1") != "1":
if _env_str("GYRO_MODE_CONTINUOUS", "1") != "1":
if now >= self._t_pulse_end:
self._set_gyro(False)
self._t_cooldown_end = now + self.cooldown_sec
@@ -846,7 +927,7 @@ def notifier_sur_depassement(site: str, sonde: str, temp: float, seuil: float):
notifier.send_email(subject, email_body) # MAIL (≥30 min)
# SMS client couplé au mail 30 min
if os.getenv("ALERT_SMS_CLIENT_ENABLED", "1") == "1":
if _env_str("ALERT_SMS_CLIENT_ENABLED", "1") == "1":
client_msg = build_client_alert_sms(site, sonde, temp, seuil)
notifier.send_sms_client(client_msg, tag=f"client-{SITE.lower()}")
@@ -858,7 +939,7 @@ def notifier_acquittement(site: str, sonde: str, temp: float, seuil: float):
subject, sms_text, email_body = build_ok_text(site, sonde, temp, seuil)
notifier.send_email(subject, email_body) # mail d'acquittement
# Optionnel: SMS "OK" côté cycle si souhaité
if os.getenv("ALERT_OK_SMS", "0") == "1":
if _env_str("ALERT_OK_SMS", "0") == "1":
notifier.send_sms(sms_text)
# ========= Cycle & boucle =========
@@ -869,7 +950,7 @@ def run_monitor_cycle(site: str = SITE):
# 2) Info: état instantané (le gyro est piloté par la boucle rapide)
try:
gyro_on, trigger = compute_site_alarm(last_rows, cfg, hysteresis=float(os.getenv("GYRO_HYSTERESIS", "0.0")))
_gyro_on, trigger = compute_site_alarm(last_rows, cfg, hysteresis=float(_env_str("GYRO_HYSTERESIS", "0.0")))
if trigger:
s, t, se = trigger
log.info("Dépassement détecté (gyro géré par boucle rapide) : %s %.2f > %.2f", s, t, se)
@@ -939,6 +1020,7 @@ if __name__ == "__main__":
p.add_argument("--test-mail", action="store_true")
p.add_argument("--test-alert", action="store_true")
p.add_argument("--test-ok", action="store_true")
p.add_argument("--test-chat", action="store_true")
p.add_argument("--once", action="store_true")
args = p.parse_args()
@@ -951,6 +1033,8 @@ if __name__ == "__main__":
notifier_sur_depassement(SITE, "Congelateur", -14.5, -15.0)
elif args.test_ok:
notifier_acquittement(SITE, "Congelateur", -15.2, -15.0)
elif args.test_chat:
send_synology_chat(f":speech_balloon: [TEST {SITE}] Notification Synology Chat OK")
else:
if args.once:
run_monitor_cycle(SITE)

958
app/Monitor_Saclay_old.py Normal file
View File

@@ -0,0 +1,958 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ========= Site =========
SITE = "Saclay"
PROGRAM_NAME = f"Monitor_{SITE}"
# ========= Imports & .env =========
import os, re, time, ssl, smtplib, logging
import datetime as dt
from email.message import EmailMessage
from datetime import datetime
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv(usecwd=True), override=False)
from utils_sms import normaliser_sms
def _env_bool(name: str, default: bool) -> bool:
v = os.getenv(name, str(int(default))).strip().lower()
return v in ("1", "true", "yes", "on")
# MySQL
import mysql.connector
from mysql.connector import Error as MySQLError
from dotenv import load_dotenv
load_dotenv()
# OVH (SMS)
try:
import ovh
from ovh.exceptions import APIError as OVHAPIError
_ovh_available = True
except Exception:
ovh = None # type: ignore
class OVHAPIError(Exception): ...
_ovh_available = False
# MQTT
try:
import paho.mqtt.client as mqtt
_mqtt_ok = True
except Exception:
_mqtt_ok = False
# ========= Logger =========
level = getattr(logging, os.getenv("LOGLEVEL", "INFO").upper(), logging.INFO)
log = logging.getLogger(PROGRAM_NAME.lower())
if not log.handlers:
logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s")
# ========= DB utils =========
def open_alert(conn, table_alertes: str, sonde: str, dt_: datetime) -> bool:
"""
Ouvre UNE alerte si aucune alerte 'En cours' n'existe encore pour la sonde.
Retourne True si une nouvelle alerte a été créée (→ notifier par mail & SMS client).
"""
cur = conn.cursor()
cur.execute(
f"SELECT 1 FROM `{table_alertes}` WHERE Sonde=%s AND Etat='En cours' LIMIT 1",
(sonde,)
)
if cur.fetchone():
cur.close()
return False # déjà ouverte
cur.execute(
f"INSERT INTO `{table_alertes}` (Sonde, Debut_defaut, Etat) VALUES (%s, %s, 'En cours')",
(sonde, dt_.strftime('%Y-%m-%d %H:%M:%S'))
)
conn.commit()
cur.close()
return True
def close_alert(conn, table_alertes: str, sonde: str) -> bool:
"""
Ferme l'alerte 'En cours' si présente.
Retourne True si une alerte est passée à 'Acquitté' (→ notifier par mail).
"""
cur = conn.cursor()
cur.execute(
f"UPDATE `{table_alertes}` SET Etat='Acquitté' "
f"WHERE Sonde=%s AND Etat='En cours' "
f"ORDER BY Debut_defaut DESC LIMIT 1",
(sonde,)
)
changed = (cur.rowcount == 1)
conn.commit()
cur.close()
return changed
def get_db():
return mysql.connector.connect(
host=os.getenv("DB_HOST"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASS"),
database=os.getenv("DB_NAME", "Sondes"),
port=int(os.getenv("DB_PORT", "3306")),
autocommit=True,
)
# --- Journalisation Gyro en table dédiée `Gyro` ---
def insert_gyro_log(lieu: str, etat: str, topic: str, payload_raw: str,
qos: int | None, retained: int | None, when: datetime):
cnx = get_db()
try:
cur = cnx.cursor()
cur.execute(
"INSERT INTO Sondes.Gyro (Lieu, Sonde, Etat, Date, Topic, Payload, QoS, Retained) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s)",
(
lieu,
os.getenv("GYRO_SONDE_NAME", "Gyro"),
etat, # 'ON' ou 'OFF'
when.strftime('%Y-%m-%d %H:%M:%S'),
topic,
payload_raw,
qos,
retained
)
)
cnx.commit()
log.info("Gyro inséré: %s %s (%s)", lieu, etat, topic)
except MySQLError as err:
log.exception("Erreur DB insert_gyro_log: %s", err)
finally:
cnx.close()
def should_insert_gyro(lieu: str, etat: str, sonde: str = "Gyro") -> bool:
sql = "SELECT Etat FROM Sondes.Gyro WHERE Lieu=%s AND Sonde=%s ORDER BY Date DESC LIMIT 1"
cnx = get_db()
try:
cur = cnx.cursor()
cur.execute(sql, (lieu, sonde))
row = cur.fetchone()
return (row is None) or (row[0] != etat)
finally:
cnx.close()
# --- Lecture des dernières mesures de température (en ignorant lignes d'état) ---
def lire_sondes_depuis_db(site: str):
table = site
sql = f"""
SELECT t1.Sonde, t1.Temperature, t1.Date
FROM `{table}` t1
JOIN (
SELECT Sonde, MAX(Date) AS MaxDate
FROM `{table}`
WHERE Temperature IS NOT NULL
GROUP BY Sonde
) t2 ON t1.Sonde=t2.Sonde AND t1.Date=t2.MaxDate
WHERE t1.Temperature IS NOT NULL
"""
cnx = get_db()
try:
cur = cnx.cursor(dictionary=True)
cur.execute(sql)
rows = cur.fetchall()
for r in rows:
r["Temperature"] = float(r["Temperature"]) # garanti NOT NULL
return rows
except MySQLError as err:
log.exception("Erreur DB (lire_sondes_depuis_db): %s", err)
return []
finally:
cnx.close()
def lire_cfg_chambres(site: str):
"""
Retourne {sonde: {"temp_max": float, "active": bool}}
depuis Chambres_froides pour le site.
"""
dbname = os.getenv("DB_NAME", "Sondes")
sql = f"""
SELECT Sonde, Temp_Max, Etat
FROM `{dbname}`.`Chambres_froides`
WHERE Lieu=%s
"""
cnx = get_db()
cfg: dict[str, dict] = {}
try:
cur = cnx.cursor()
cur.execute(sql, (site,))
for sonde, temp_max, etat in cur.fetchall():
cfg[str(sonde)] = {
"temp_max": float(temp_max),
"active": str(etat).upper() == "ON",
}
return cfg
except MySQLError as err:
log.exception("Erreur DB (lire_cfg_chambres): %s", err)
return cfg
finally:
cnx.close()
def compute_site_alarm(last_values: list[dict], cfg: dict[str, dict], hysteresis: float = 0.0):
"""
Retourne (is_on: bool, trigger: (sonde,temp,seuil) | None)
"""
for row in last_values:
sonde = str(row["Sonde"])
meta = cfg.get(sonde)
if not meta or not meta.get("active", False):
continue
temp = float(row["Temperature"])
seuil = float(meta["temp_max"])
if temp > seuil + float(hysteresis):
return True, (sonde, temp, seuil)
return False, None
def lire_seuils_depuis_db(site: str):
sql = """
SELECT Sonde, Temp_Max
FROM Sondes.Chambres_froides
WHERE Lieu=%s AND Etat='ON'
"""
cnx = get_db()
seuils = {}
try:
cur = cnx.cursor()
cur.execute(sql, (site, ))
for sonde, s in cur.fetchall():
seuils[str(sonde)] = float(s)
return seuils
except MySQLError as err:
log.exception("Erreur DB (lire_seuils_depuis_db): %s", err)
return seuils
finally:
cnx.close()
# --- Dépassement continu (configurable) ---
def depassement_depuis_30min(site: str, sonde: str, seuil: float) -> bool:
"""
True si la sonde est > seuil de façon CONTINUE depuis CONT_MIN minutes.
CONT_MIN = ALERT_CONTINUOUS_MINUTES (defaut 30)
LOOKBACK = ALERT_LOOKBACK_MINUTES (defaut max(60, CONT_MIN*3))
"""
CONT_MIN = int(os.getenv("ALERT_CONTINUOUS_MINUTES", "30"))
LOOKBACK = int(os.getenv("ALERT_LOOKBACK_MINUTES", str(max(60, int(os.getenv("ALERT_CONTINUOUS_MINUTES", "30"))*3))))
table = site
cnx = get_db()
try:
cur = cnx.cursor()
cur.execute(f"""
SELECT Temperature, Date
FROM `{table}`
WHERE Sonde=%s
AND Date >= (NOW() - INTERVAL %s MINUTE)
ORDER BY Date DESC
""", (sonde, LOOKBACK))
rows = cur.fetchall()
if not rows:
return False
last_temp, last_dt = float(rows[0][0]), rows[0][1]
if last_temp <= float(seuil):
return False
# Début de la séquence continue > seuil
start_dt = last_dt
for temp, d in rows[1:]:
if float(temp) > float(seuil):
start_dt = d
else:
break
tzinfo = getattr(start_dt, "tzinfo", None)
now = dt.datetime.now(tz=tzinfo)
dur_min = (now - start_dt).total_seconds() / 60.0
log.debug("Seq>seuil %s: start=%s, now=%s, dur=%.1fmin, need>=%d",
sonde, start_dt, now, dur_min, CONT_MIN)
return dur_min >= CONT_MIN
except MySQLError as err:
log.exception("Erreur DB (depassement_depuis_30min, continu): %s", err)
return False
finally:
cnx.close()
# ========= Helpers listes/numéros =========
def _split_list(raw: str | None) -> list[str]:
return [x.strip() for x in re.split(r"[;,]", raw or "") if x.strip()]
def _parse_labeled_phones(raw: str | None) -> list[tuple[str, str]]:
out: list[tuple[str, str]] = []
for tok in re.split(r"[;,]", raw or ""):
tok = tok.strip()
if not tok:
continue
if ":" in tok:
name, num = tok.split(":", 1)
out.append((name.strip(), num.strip()))
else:
out.append(("", tok))
return out
def _resolve_sms_receivers(labeled: list[tuple[str, str]]) -> list[str]:
only = os.getenv("ALERT_SMS_ONLY")
if not only:
return [num for (_n, num) in labeled]
allow = {x.strip() for x in re.split(r"[;,]", only) if x.strip()}
return [num for (name, num) in labeled if (name and name in allow) or (num in allow)]
def _human_labeled_list(labeled: list[tuple[str, str]]) -> str:
return ", ".join([f"{n}({p})" if n else p for n, p in labeled])
# ========= Notifier (SMS interne + SMS client + Mail) =========
class Notifier:
def __init__(self):
# OVH SMS
self.ovh_enabled = _ovh_available and all(
os.getenv(k) for k in ("OVH_APPLICATION_KEY","OVH_APPLICATION_SECRET","OVH_CONSUMER_KEY","OVH_SMS_SERVICE","OVH_SMS_SENDER")
)
if self.ovh_enabled:
self.ovh_client = ovh.Client(
endpoint=os.getenv("OVH_ENDPOINT","ovh-eu"),
application_key=os.getenv("OVH_APPLICATION_KEY"),
application_secret=os.getenv("OVH_APPLICATION_SECRET"),
consumer_key=os.getenv("OVH_CONSUMER_KEY"),
)
self.ovh_service = os.getenv("OVH_SMS_SERVICE")
self.ovh_sender = os.getenv("OVH_SMS_SENDER")
raw_sms = (os.getenv(f"ALERT_SMS_TO_{SITE}") or os.getenv(f"ALERT_SMS_TO_{SITE.upper()}") or os.getenv("ALERT_SMS_TO"))
self.sms_labeled = _parse_labeled_phones(raw_sms)
else:
self.sms_labeled = []
# SMS CLIENTS (site-spécifique + génériques + compat FR)
raw_sms_client = (
os.getenv(f"ALERT_SMS_CLIENT_TO_{SITE}") or
os.getenv(f"ALERT_SMS_CLIENT_TO_{SITE.upper()}") or
os.getenv("ALERT_SMS_CLIENT_TO") or
os.getenv(f"ALERTE_CLIENT_{SITE}") or
os.getenv("ALERTE_CLIENT")
)
self.sms_client_labeled = _parse_labeled_phones(raw_sms_client)
self.sms_client_enabled = (os.getenv("ALERT_SMS_CLIENT_ENABLED", "1") == "1")
# SMTP
self.smtp_host = os.getenv("SMTP_HOST")
self.smtp_port = int(os.getenv("SMTP_PORT","465"))
self.smtp_user = os.getenv("SMTP_USER")
self.smtp_pass = os.getenv("SMTP_PASS")
self.smtp_security = (os.getenv("SMTP_SECURITY","SSL") or "SSL").upper()
raw_mail_to = (os.getenv(f"MAIL_TO_{SITE}") or os.getenv(f"MAIL_TO_{SITE.upper()}") or os.getenv("MAIL_TO") or "")
self.mail_to = _split_list(raw_mail_to)
self.mail_from = (os.getenv(f"MAIL_FROM_{SITE}") or os.getenv(f"MAIL_FROM_{SITE.upper()}") or os.getenv("MAIL_FROM") or self.smtp_user)
self.smtp_enabled = all([self.smtp_host, self.smtp_port, self.smtp_user, self.smtp_pass, self.mail_to])
def send_sms(self, message: str, tag: str = f"monitor-{SITE.lower()}") -> bool:
if not self.ovh_enabled or not self.sms_labeled:
log.warning("SMS désactivé ou aucun destinataire.")
return False
receivers = _resolve_sms_receivers(self.sms_labeled)
if not receivers:
log.warning("ALERT_SMS_ONLY filtre tous les destinataires (aucun envoi).")
return False
# ✅ Normalisation GSM-7 + préfixe site
message = normaliser_sms(message, prefix=SITE)
payload = {
"sender": self.ovh_sender,
"receivers": receivers,
"message": message,
"priority": "high",
"coding": "7bit",
"class": "phoneDisplay",
"noStopClause": True,
"senderForResponse": False,
"validityPeriod": 2880,
"tag": tag,
}
try:
log.info("Envoi SMS vers: %s", _human_labeled_list([(n,p) for (n,p) in self.sms_labeled if p in receivers]))
resp = self.ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload)
ids = resp.get("ids") or []
log.info("SMS OVH envoyé (job ids=%s)", ids)
try:
if ids:
job_id = ids[0]
for _ in range(3):
job = self.ovh_client.get(f"/sms/{self.ovh_service}/jobs/{job_id}")
if job.get("status") in ("done","error","cancelled"):
log.info("Statut job SMS: %s", job.get("status")); break
time.sleep(1.5)
except Exception as e:
log.debug("Suivi job OVH indisponible (OK): %s", e)
return True
except OVHAPIError as err:
log.exception("Erreur API OVH: %s", err); return False
except Exception as err:
log.exception("Echec envoi SMS OVH: %s", err); return False
def send_sms_client(self, message: str, tag: str = f"monitor-client-{SITE.lower()}") -> bool:
if not self.ovh_enabled:
log.warning("SMS client: OVH non configuré."); return False
if not self.sms_client_enabled or not self.sms_client_labeled:
log.info("SMS client: désactivé ou aucun destinataire."); return False
only = os.getenv("ALERT_SMS_CLIENT_ONLY")
if only:
allow = {x.strip() for x in re.split(r"[;,]", only) if x.strip()}
labeled = [(n, p) for (n, p) in self.sms_client_labeled if (n and n in allow) or (p in allow)]
else:
labeled = self.sms_client_labeled
receivers = [num for (_n, num) in labeled]
if not receivers:
log.info("SMS client: filtre vide → aucun envoi."); return False
message = normaliser_sms(message, prefix=SITE)
payload = {
"sender": self.ovh_sender,
"receivers": receivers,
"message": message,
"priority": "high",
"coding": "7bit",
"class": "phoneDisplay",
"noStopClause": True,
"senderForResponse": False,
"validityPeriod": 2880,
"tag": tag,
}
try:
log.info("Envoi SMS CLIENT vers: %s", _human_labeled_list(labeled))
resp = self.ovh_client.post(f"/sms/{self.ovh_service}/jobs", **payload)
log.info("SMS CLIENT OVH envoyé (job ids=%s)", resp.get("ids"))
return True
except Exception as err:
log.exception("Echec SMS CLIENT OVH: %s", err); return False
def send_email(self, subject: str, body: str) -> bool:
if not self.smtp_enabled:
log.warning("SMTP non configuré, email non envoyé."); return False
msg = EmailMessage()
msg["From"] = self.mail_from
msg["To"] = ", ".join(self.mail_to)
msg["Subject"] = subject
msg.set_content(body)
timeout = int(os.getenv("SMTP_TIMEOUT","60"))
debug = os.getenv("SMTP_DEBUG","0") == "1"
def _send_ssl():
with smtplib.SMTP_SSL(self.smtp_host, 465, context=ssl.create_default_context(), timeout=timeout) as server:
if debug: server.set_debuglevel(1)
server.login(self.smtp_user, self.smtp_pass)
server.send_message(msg)
def _send_starttls():
with smtplib.SMTP(self.smtp_host, self.smtp_port, timeout=timeout) as server:
if debug: server.set_debuglevel(1)
server.ehlo(); server.starttls(context=ssl.create_default_context()); server.ehlo()
server.login(self.smtp_user, self.smtp_pass)
server.send_message(msg)
try:
if self.smtp_security == "STARTTLS":
try:
_send_starttls()
except (smtplib.SMTPServerDisconnected, TimeoutError, smtplib.SMTPConnectError) as err:
log.warning("STARTTLS/587 a échoué (%s). Tentative en SSL/465...", err)
_send_ssl()
else:
_send_ssl()
log.info("Email envoyé à %s", self.mail_to)
return True
except (smtplib.SMTPException, ssl.SSLError, TimeoutError) as err:
log.exception("Erreur SMTP: %s", err); return False
except Exception as err:
log.exception("Echec envoi email: %s", err); return False
# ========= Mise en forme messages =========
from zoneinfo import ZoneInfo
PARIS = ZoneInfo("Europe/Paris")
def fmt_deg(v: float) -> str:
s = f"{float(v):.1f}".replace(".", ","); return f"{s}°C"
def now_paris() -> dt.datetime:
return dt.datetime.now(tz=PARIS)
def build_alert_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None):
when = when or now_paris()
subject = f"[ALERTE {site}] {sonde} au-dessus du seuil"
lines = [
subject + ":",
f"Sonde: {sonde}",
f"Température: {fmt_deg(temp)} (seuil {fmt_deg(seuil)})",
f"Site: {site}",
f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}"
]
txt = "\n".join(lines)
return subject, txt, txt
def build_ok_text(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None):
when = when or now_paris()
subject = f"[OK {site}] {sonde} revenue normale"
lines = [
subject + ":",
f"Sonde: {sonde}",
f"Température: {fmt_deg(temp)} <= seuil {fmt_deg(seuil)}",
f"Site: {site}",
f"Heure: {when.strftime('%Y-%m-%d %H:%M:%S')}"
]
txt = "\n".join(lines)
return subject, txt, txt
def build_client_alert_sms(site: str, sonde: str, temp: float, seuil: float, when: dt.datetime | None = None) -> str:
when = when or now_paris()
# Court, 1 ligne; accents/° nettoyés par normaliser_sms
return f"ALERTE CLIENT {sonde}: T={fmt_deg(temp)} > S={fmt_deg(seuil)} H:{when.strftime('%H:%M')}"
# ========= Gyrophare MQTT =========
class MQTTPublisher:
def __init__(self, site: str):
self.enabled = bool(_mqtt_ok)
self.site = site
self.topic = (
os.getenv(f"GYRO_MQTT_TOPIC_{site}") or
os.getenv(f"GYRO_MQTT_TOPIC_{site.upper()}") or
os.getenv("GYRO_MQTT_TOPIC") or
f"Sondes/{site}/Gyro/cmd"
)
self.last_state: bool | None = None
if not self.enabled:
log.info("Gyro MQTT désactivé (paho-mqtt absent).")
return
if not self.topic:
log.warning("Topic MQTT manquant pour %s (GYRO_MQTT_TOPIC_%s)", site, site)
self.enabled = False
return
host = os.getenv("MQTT_HOST", "localhost")
port = int(os.getenv("MQTT_PORT", "1883"))
user = os.getenv("MQTT_USER")
pwd = os.getenv("MQTT_PASS")
tls = (os.getenv("MQTT_TLS", "0") == "1")
# --- Création du client MQTT : compatible paho 1.x et 2.x ---
cbver = getattr(mqtt, "CallbackAPIVersion", None)
if cbver is not None:
api_v = (
getattr(cbver, "VERSION2", None)
or getattr(cbver, "V5", None)
or getattr(cbver, "v5", None)
or getattr(cbver, "V311", None)
)
try:
self.client = mqtt.Client(callback_api_version=api_v) if api_v else mqtt.Client()
except TypeError:
self.client = mqtt.Client()
else:
self.client = mqtt.Client()
# ------------------------------------------------------------
if user and pwd:
self.client.username_pw_set(user, pwd)
if tls:
self.client.tls_set()
try:
# Attacher le callback avant de s'abonner
self.client.on_message = self._on_message
self.client.connect(host, port, keepalive=30)
# Abonnements (depuis env ou valeurs par défaut raisonnables)
subs_env = (
os.getenv(f"GYRO_MQTT_SUB_{site}") or
os.getenv(f"GYRO_MQTT_SUB_{site.upper()}") or
os.getenv("GYRO_MQTT_SUB") or
""
)
subs = [t.strip() for t in subs_env.split(",") if t.strip()]
if not subs:
subs = [
self.topic, # ex: Sondes/Saclay/Gyro/cmd
f"Sondes/{site}/Gyro/#",
f"{site}/Gyro/#",
"Gyro/#",
]
for t in subs:
try:
self.client.subscribe(t, qos=2)
log.info("MQTT subscribe: %s", t)
except Exception as e:
log.warning("Subscribe échoué (%s): %s", t, e)
self.client.loop_start()
log.info("MQTT connecté (%s:%s), topic=%s", host, port, self.topic)
except Exception as e:
log.exception("MQTT connexion impossible: %s", e)
self.enabled = False
# --- Callback réception MQTT ---
def _on_message(self, client, userdata, msg):
lieu = self.site
topic = msg.topic
payload_raw = msg.payload.decode(errors="ignore").strip()
upper = payload_raw.upper()
# 1) Évènements gyrophare
if upper in ("ON", "OFF") or "gyro" in topic.lower() or "gyrophare" in topic.lower():
etat = upper if upper in ("ON", "OFF") else ("ON" if "on" in upper else "OFF")
try:
if should_insert_gyro(lieu, etat):
insert_gyro_log(
lieu=lieu,
etat=etat,
topic=topic,
payload_raw=payload_raw,
qos=getattr(msg, "qos", None),
retained=getattr(msg, "retain", None),
when=now_paris()
)
except Exception as e:
log.exception("Insert Gyro échoué: %s", e)
return # ne pas poursuivre vers un parse température ici
# 2) Pas du gyro → ignorer ici (la collecte T° est gérée ailleurs)
try:
float(payload_raw.replace(",", "."))
except ValueError:
log.debug("Payload non géré (ni gyro ni nombre): %s %s", topic, payload_raw)
def set(self, on: bool):
if not self.enabled:
return
if self.last_state is not None and self.last_state == on:
return
payload = "ON" if on else "OFF"
try:
r = self.client.publish(self.topic, payload=payload, qos=2, retain=True)
try:
r.wait_for_publish(timeout=3)
except Exception:
pass
if getattr(r, 'rc', 0) != 0:
log.warning("MQTT publish rc=%s (topic=%s)", getattr(r, 'rc', None), self.topic)
else:
log.info("Gyro %s -> %s (MQTT)", self.site, payload.upper())
# Enregistrer en base l'événement gyro
try:
insert_gyro_log(
lieu=self.site,
etat=payload,
topic=self.topic,
payload_raw=payload,
qos=2,
retained=1 if getattr(r, 'is_published', lambda: False)() else None,
when=now_paris()
)
except Exception as e:
log.exception("Insert événement gyro en base a échoué: %s", e)
self.last_state = on
except Exception as e:
log.exception("MQTT publish erreur: %s", e)
# ========= Contrôleur Gyro réactif =========
import enum, threading
class _GyroState(enum.Enum):
IDLE = 0
PULSE_ON = 1
COOLDOWN = 2
class GyroPulseController:
"""
Boucle rapide indépendante :
- MODE CONTINU (défaut) : ON tant que lalarme persiste, OFF quand normal confirmé.
- MODE PULSE : ON (PULSE_SEC) puis OFF (COOLDOWN_SEC), tant que lalarme persiste.
Ajouts :
- SMS ALERTE immédiat à lallumage
- SMS OK immédiat à lextinction (activé par défaut)
"""
def __init__(self, site: str, beacon, notifier, *,
check_sec: int = int(os.getenv("GYRO_CHECK_SEC", "20")),
pulse_sec: int = int(os.getenv("GYRO_PULSE_SEC", "60")),
cooldown_sec: int = int(os.getenv("GYRO_COOLDOWN_SEC", "600")),
normal_confirm: int = int(os.getenv("GYRO_NORMAL_CONFIRM", "2"))):
self.site = site
self.beacon = beacon
self.notifier = notifier
self.check_sec = check_sec
self.pulse_sec = pulse_sec
self.cooldown_sec = cooldown_sec
self.normal_confirm = normal_confirm
self.state = _GyroState.IDLE
self._t_pulse_end = 0.0
self._t_cooldown_end = 0.0
self._normal_count = 0
self._stop = threading.Event()
self._thread = None
self._current = None # dernier état effectif
# Anti-spam SMS & SMS OK activé par défaut
self._last_sms: dict[str, float] = {} # {sonde: ts dernier envoi}
self._sms_min_sec = int(os.getenv("ALERT_SMS_COOLDOWN_SEC") or os.getenv("GYRO_SMS_MIN_SEC", "120"))
self._send_ok = (os.getenv("ALERT_OK_SMS_GYRO", "1") == "1")
# Conserver le dernier déclencheur (pour SMS OK)
self._last_trigger: tuple[str, float, float] | None = None # (sonde, temp, seuil)
def _set_gyro(self, on: bool):
if self._current is not on:
self.beacon.set(on)
self._current = on
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop.clear()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
log.info("GyroPulseController démarré (site=%s, check=%ss, pulse=%ss, cooldown=%ss, confirm=%d)",
self.site, self.check_sec, self.pulse_sec, self.cooldown_sec, self.normal_confirm)
def stop(self):
self._stop.set()
def _sms_can_send(self, sonde: str) -> bool:
t = time.time()
last = self._last_sms.get(sonde, 0.0)
if (t - last) >= self._sms_min_sec:
self._last_sms[sonde] = t
return True
return False
def _send_alert_sms(self, trigger: tuple[str, float, float] | None):
if not _env_bool("ALERT_INTERNAL_SMS_ENABLED", True):
return
if not trigger:
return
sonde, temp, seuil = trigger
if self._sms_can_send(sonde):
_, sms_text, _ = build_alert_text(self.site, sonde, temp, seuil, when=now_paris())
self.notifier.send_sms(sms_text)
def _send_ok_sms_from_last_trigger(self):
if not _env_bool("ALERT_OK_SMS_GYRO", True):
return
if not self._send_ok or not self._last_trigger:
return
sonde, _temp_prev, seuil = self._last_trigger
# Température courante pour le SMS OK
rows = lire_sondes_depuis_db(self.site)
curr_temp = None
for r in rows:
if str(r["Sonde"]) == sonde:
curr_temp = float(r["Temperature"]); break
if curr_temp is None:
curr_temp = seuil - 0.1 # fallback léger
if self._sms_can_send(sonde):
_, sms_text, _ = build_ok_text(self.site, sonde, curr_temp, seuil, when=now_paris())
self.notifier.send_sms(sms_text)
self._last_trigger = None # reset
def _is_alarm_now(self) -> tuple[bool, tuple[str, float, float] | None]:
last_rows = lire_sondes_depuis_db(self.site)
cfg = lire_cfg_chambres(self.site)
return compute_site_alarm(last_rows, cfg, hysteresis=float(os.getenv("GYRO_HYSTERESIS", "0.0")))
def _run(self):
while not self._stop.is_set():
now = time.time()
try:
active, trigger = self._is_alarm_now()
except Exception as e:
log.exception("Gyro fast-loop: erreur lecture état: %s", e)
active, trigger = (False, None)
if self.state == _GyroState.IDLE:
if active:
self._set_gyro(True)
self._t_pulse_end = now + self.pulse_sec
self._normal_count = 0
self.state = _GyroState.PULSE_ON
self._last_trigger = trigger
if trigger:
s, t, se = trigger
log.info("Gyro → ON déclenché par %s: %.2f > %.2f (mode %s)",
s, t, se, "CONTINU" if os.getenv("GYRO_MODE_CONTINUOUS", "1") == "1" else "PULSE")
# SMS alerte immédiat (optionnel)
if os.getenv("ALERT_INTERNAL_SMS_ENABLED", "0") == "1":
self._send_alert_sms(trigger)
elif self.state == _GyroState.PULSE_ON:
if not active:
self._normal_count += 1
if self._normal_count >= self.normal_confirm:
self._set_gyro(False)
self.state = _GyroState.IDLE
self._normal_count = 0
log.info("Gyro → OFF (retour à la normale confirmé)")
# SMS OK immédiat
if os.getenv("ALERT_OK_SMS_GYRO", "0") == "1":
self._send_ok_sms_from_last_trigger()
else:
self._normal_count = 0
if os.getenv("GYRO_MODE_CONTINUOUS", "1") != "1":
if now >= self._t_pulse_end:
self._set_gyro(False)
self._t_cooldown_end = now + self.cooldown_sec
self.state = _GyroState.COOLDOWN
log.info("Gyro → OFF, cooldown %ss (alerte persiste)", self.cooldown_sec)
elif self.state == _GyroState.COOLDOWN:
if not active:
self._normal_count += 1
if self._normal_count >= self.normal_confirm:
self.state = _GyroState.IDLE
self._normal_count = 0
log.info("Gyro: retour IDLE (plus dalerte)")
else:
self._normal_count = 0
if now >= self._t_cooldown_end:
self._set_gyro(True)
self._t_pulse_end = now + self.pulse_sec
self.state = _GyroState.PULSE_ON
log.info("Gyro → ON (re-pulse)")
time.sleep(self.check_sec)
# ========= Notifs haut-niveau =========
notifier = Notifier()
beacon = MQTTPublisher(SITE)
def notifier_sur_depassement(site: str, sonde: str, temp: float, seuil: float):
"""
MAIL quand l'alerte est confirmée (≥30 min) et ouverte en base.
+ SMS CLIENT couplé (ALERTE_CLIENT_{SITE}).
(Le SMS d'alerte interne est envoyé immédiatement par la boucle gyro.)
"""
subject, _sms_text, email_body = build_alert_text(site, sonde, temp, seuil)
notifier.send_email(subject, email_body) # MAIL (≥30 min)
# SMS client couplé au mail 30 min
if os.getenv("ALERT_SMS_CLIENT_ENABLED", "1") == "1":
client_msg = build_client_alert_sms(site, sonde, temp, seuil)
notifier.send_sms_client(client_msg, tag=f"client-{SITE.lower()}")
def notifier_acquittement(site: str, sonde: str, temp: float, seuil: float):
"""
MAIL lorsque lalerte est acquittée en base.
(Le SMS "OK" est envoyé immédiatement par la boucle gyro.)
"""
subject, sms_text, email_body = build_ok_text(site, sonde, temp, seuil)
notifier.send_email(subject, email_body) # mail d'acquittement
# Optionnel: SMS "OK" côté cycle si souhaité
if os.getenv("ALERT_OK_SMS", "0") == "1":
notifier.send_sms(sms_text)
# ========= Cycle & boucle =========
def run_monitor_cycle(site: str = SITE):
# 1) Lecture dernières mesures + config
last_rows = lire_sondes_depuis_db(site)
cfg = lire_cfg_chambres(site)
# 2) Info: état instantané (le gyro est piloté par la boucle rapide)
try:
gyro_on, trigger = compute_site_alarm(last_rows, cfg, hysteresis=float(os.getenv("GYRO_HYSTERESIS", "0.0")))
if trigger:
s, t, se = trigger
log.info("Dépassement détecté (gyro géré par boucle rapide) : %s %.2f > %.2f", s, t, se)
else:
log.info("Aucun dépassement au moment du cycle")
except Exception as e:
log.exception("Erreur calcul alarme (info): %s", e)
# 3) Alertes "officielles" temporisées (≥30 min) → mail + SMS client
seuils = {s: meta["temp_max"] for s, meta in cfg.items() if meta.get("active", False)}
for r in last_rows:
nom = str(r["Sonde"])
temp = float(r["Temperature"])
if nom not in seuils:
continue # sonde non gérée dans Chambres_froides → ignorée
seuil = float(seuils[nom])
now_ = now_paris()
if temp > seuil:
if depassement_depuis_30min(site, nom, seuil):
conn = None
try:
conn = get_db()
if open_alert(conn, f"Alertes_{site}", nom, now_):
notifier_sur_depassement(site, nom, temp, seuil)
finally:
if conn:
conn.close()
else:
conn = None
try:
conn = get_db()
if close_alert(conn, f"Alertes_{site}", nom):
notifier_acquittement(site, nom, temp, seuil)
finally:
if conn:
conn.close()
def run_monitor_loop(site: str = SITE, period_sec: int = 300):
log.info("%s démarré (site=%s, période=%ss) ✅", PROGRAM_NAME, site, period_sec)
# Démarrage du contrôleur gyro rapide (thread) + notifier pour SMS immédiats
try:
global _gyro_controller
_gyro_controller = GyroPulseController(site, beacon, notifier)
_gyro_controller.start()
except Exception as e:
log.exception("Impossible de démarrer le GyroPulseController: %s", e)
while True:
t0 = time.time()
try:
run_monitor_cycle(site)
except Exception as err:
log.exception("Erreur cycle monitoring: %s", err)
time.sleep(max(0, period_sec - (time.time() - t0)))
# ========= CLI =========
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser(description=PROGRAM_NAME)
p.add_argument("--period", type=int, default=300)
p.add_argument("--test-sms", action="store_true")
p.add_argument("--test-mail", action="store_true")
p.add_argument("--test-alert", action="store_true")
p.add_argument("--test-ok", action="store_true")
p.add_argument("--once", action="store_true")
args = p.parse_args()
if args.test_sms:
n = Notifier()
n.send_sms("TEST DOMO91 (transactionnel)")
elif args.test_mail:
notifier.send_email(f"[TEST {SITE}] Mail", "OK")
elif args.test_alert:
notifier_sur_depassement(SITE, "Congelateur", -14.5, -15.0)
elif args.test_ok:
notifier_acquittement(SITE, "Congelateur", -15.2, -15.0)
else:
if args.once:
run_monitor_cycle(SITE)
else:
run_monitor_loop(SITE, period_sec=args.period)

20
app/Test_Chat.py Normal file
View File

@@ -0,0 +1,20 @@
import json
import requests
WEBHOOK_URL = "https://192.168.1.250/webapi/entry.cgi?api=SYNO.Chat.External&method=incoming&version=2&token=UN7nhD70vrhrHFh1VeDdOpsklIHiIFRop2qB7b6YusMEY3clY3R8CXe4hFzz4KKc"
payload = {
"text": "✅ Test VPS vers Synology Chat"
}
try:
r = requests.post(
WEBHOOK_URL,
data={"payload": json.dumps(payload, ensure_ascii=False)},
timeout=10,
verify=False
)
print("HTTP:", r.status_code)
print("Réponse:", r.text)
except Exception as e:
print("Erreur:", repr(e))

View File

@@ -1,34 +1,74 @@
#!/usr/bin/env python3
# Surveillance des réceptions de données dans les tables (par site) + mail d'alerte / retour à la normale
# -*- coding: utf-8 -*-
"""
Surveillance des réceptions de données dans les tables (par site)
+ alerte mail
+ alerte Synology Chat
+ retour à la normale
"""
from __future__ import annotations
from datetime import datetime, timedelta
from pathlib import Path
import json
import logging
import os
import sys
import logging
import tempfile
import mysql.connector
from contextlib import closing
import mysql.connector # important pour cibler les exceptions MySQL
from typing import cast
from mysql.connector.connection import MySQLConnection
from mysql.connector.cursor import MySQLCursor
import requests
from dotenv import load_dotenv
import utils_db
from utils_mail import envoyer_mail
# -------------------- PATHS (portable Windows/Linux) --------------------
APP_DIR = os.path.dirname(os.path.abspath(__file__)) # .../Gestion_sondes/app
ROOT_DIR = os.path.abspath(os.path.join(APP_DIR, "..")) # .../Gestion_sondes
# ============================================================
# PATHS / ENV
# ============================================================
ENV_PATH = os.path.join(ROOT_DIR, ".env")
APP_DIR = Path(__file__).resolve().parent # .../Gestion_sondes/app
ROOT_DIR = APP_DIR.parent # .../Gestion_sondes
ENV_PATH = ROOT_DIR / ".env"
LOG_DIR = ROOT_DIR / "Logs"
LOG_DIR.mkdir(parents=True, exist_ok=True)
LOG_DIR = os.path.join(ROOT_DIR, "Logs")
os.makedirs(LOG_DIR, exist_ok=True)
# Etat persistant dans le projet (évite les surprises de /tmp)
STATE_DIR = APP_DIR / "state"
STATE_DIR.mkdir(parents=True, exist_ok=True)
log_filename = os.path.join(LOG_DIR, datetime.now().strftime("surveillance_%Y-%m-%d.log"))
load_dotenv(ENV_PATH, override=True)
# -------------------- LOGGING (UTF-8 + fichier + console) --------------------
# ============================================================
# CONFIGURATION
# ============================================================
TABLES = ["Saclay", "Meudon"]
TABLES_SET = set(TABLES)
DELAI_MINUTES = 15
RAPPEL_HEURES = 6
SYNOLOGY_CHAT_WEBHOOK_URL = os.getenv("SYNOLOGY_CHAT_WEBHOOK_URL", "").strip()
SYNOLOGY_CHAT_VERIFY_SSL = os.getenv("SYNOLOGY_CHAT_VERIFY_SSL", "true").strip().lower() in (
"1", "true", "yes", "on"
)
# ============================================================
# LOGGING
# ============================================================
log_filename = LOG_DIR / f"surveillance_{datetime.now():%Y-%m-%d}.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
@@ -36,191 +76,329 @@ logging.basicConfig(
logging.FileHandler(log_filename, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
force=True, # Python 3.8+
force=True,
)
# Fix Windows/PyCharm : forcer l'encodage UTF-8 sur le StreamHandler (évite UnicodeEncodeError cp1252)
for h in logging.getLogger().handlers:
if isinstance(h, logging.StreamHandler):
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.StreamHandler):
try:
h.stream.reconfigure(encoding="utf-8", errors="replace")
handler.stream.reconfigure(encoding="utf-8", errors="replace")
except Exception:
# Certains streams/IDEs ne supportent pas reconfigure
pass
# -------------------- ENV --------------------
# override=True : si cron/supervisor/IDE a des variables vides, le .env reprend la main
load_dotenv(ENV_PATH, override=True)
# ============================================================
# ETAT DES ALERTES
# ============================================================
def state_file(site: str) -> Path:
return STATE_DIR / f"{site}.json"
# -------------------- PARAMETRES --------------------
# Mets ici les tables réelles à surveiller (noms exacts des tables MySQL)
tables = ["Saclay", "Meudon"]
def read_state(site: str) -> dict:
sf = state_file(site)
if not sf.exists():
return {
"status": "ok", # ok | alerting
"first_alert_at": None,
"last_alert_at": None,
"last_data_at": None,
}
DELAI_MINUTES = 15
RAPPEL_HEURES = 6
# State portable (Windows/Linux)
STATE_DIR = os.path.join(tempfile.gettempdir(), "surveillance_states")
os.makedirs(STATE_DIR, exist_ok=True)
TABLES_SET = set(tables) # whitelist simple
# -------------------- STATE MANAGEMENT --------------------
def _state_file(site: str) -> str:
return os.path.join(STATE_DIR, f"{site}.state")
def _read_last_alert(site: str) -> datetime | None:
sf = _state_file(site)
if not os.path.exists(sf):
return None
try:
with open(sf, "r", encoding="utf-8") as f:
raw = f.read().strip()
return datetime.fromisoformat(raw)
except (OSError, ValueError) as e:
logging.warning(f"Etat corrompu pour {site} ({sf}) : {e}. On ignorera cet état.")
return None
data = json.loads(sf.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("format JSON invalide")
return {
"status": data.get("status", "ok"),
"first_alert_at": data.get("first_alert_at"),
"last_alert_at": data.get("last_alert_at"),
"last_data_at": data.get("last_data_at"),
}
except Exception as e:
logging.warning(f"Etat corrompu pour {site} ({sf}) : {e}. Etat réinitialisé.")
return {
"status": "ok",
"first_alert_at": None,
"last_alert_at": None,
"last_data_at": None,
}
def mark_alert_sent(site: str, when: datetime | None = None) -> None:
"""Écrit/actualise l'état *après* un envoi mail réussi."""
sf = _state_file(site)
when = when or datetime.now()
def write_state(site: str, state: dict) -> None:
sf = state_file(site)
try:
with open(sf, "w", encoding="utf-8") as f:
f.write(when.isoformat())
sf.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
except OSError as e:
logging.warning(f"Impossible d'écrire l'état {sf} : {e} (l'anti-spam sera moins efficace).")
logging.warning(f"Impossible d'écrire l'état {sf} : {e}")
def dt_to_iso(value) -> str | None:
if value is None:
return None
if isinstance(value, datetime):
return value.isoformat()
return str(value)
def iso_to_dt(value: str | None) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(value)
except ValueError:
return None
def enter_alert_state(site: str, last_update) -> None:
state = read_state(site)
now = datetime.now()
state["status"] = "alerting"
state["first_alert_at"] = state["first_alert_at"] or now.isoformat()
state["last_alert_at"] = now.isoformat()
state["last_data_at"] = dt_to_iso(last_update)
write_state(site, state)
def update_last_data(site: str, last_update) -> None:
state = read_state(site)
state["last_data_at"] = dt_to_iso(last_update)
write_state(site, state)
def clear_state(site: str) -> None:
write_state(site, {
"status": "ok",
"first_alert_at": None,
"last_alert_at": None,
"last_data_at": None,
})
def is_alerting(site: str) -> bool:
return read_state(site).get("status") == "alerting"
def should_send_alert(site: str) -> bool:
"""
Retourne True si on doit envoyer une alerte maintenant (première fois ou rappel).
IMPORTANT : ne modifie PAS l'état ici.
L'état est mis à jour uniquement après envoi réussi (mark_alert_sent).
Règle :
- 1ère alerte dès que l'absence de données dépasse DELAI_MINUTES
- puis 1 rappel toutes les RAPPEL_HEURES tant que le défaut persiste
"""
last_alert = _read_last_alert(site)
state = read_state(site)
if state.get("status") != "alerting":
return True
last_alert = iso_to_dt(state.get("last_alert_at"))
if last_alert is None:
return True
return (datetime.now() - last_alert) >= timedelta(hours=RAPPEL_HEURES)
def clear_state(site: str) -> None:
sf = _state_file(site)
# ============================================================
# NOTIFICATIONS
# ============================================================
def envoyer_chat(titre: str, message: str) -> None:
if not SYNOLOGY_CHAT_WEBHOOK_URL:
logging.warning("Webhook Synology Chat non configuré : notification Chat ignorée.")
return
texte = f"{titre}\n{message}"
payload = {"text": texte}
response = requests.post(
SYNOLOGY_CHAT_WEBHOOK_URL,
data={"payload": json.dumps(payload, ensure_ascii=False)},
timeout=10,
verify=SYNOLOGY_CHAT_VERIFY_SSL,
)
response.raise_for_status()
try:
if os.path.exists(sf):
os.remove(sf)
except OSError as e:
logging.warning(f"Impossible de supprimer l'état {sf} : {e}")
rep_json = response.json()
if isinstance(rep_json, dict) and rep_json.get("success") is False:
raise RuntimeError(f"Synology Chat a refusé le message : {rep_json}")
except ValueError:
pass
logging.info("💬 Notification Synology Chat envoyée.")
# -------------------- MAIN --------------------
def envoyer_notifications(sujet: str, message: str) -> None:
"""
Envoie mail + chat.
Lève une erreur si au moins un des deux canaux échoue.
"""
erreurs: list[str] = []
try:
envoyer_mail(sujet, message)
logging.info("📧 Mail envoyé.")
except Exception as e:
erreurs.append(f"mail: {e}")
try:
envoyer_chat(sujet, message)
except Exception as e:
erreurs.append(f"chat: {e}")
if erreurs:
raise RuntimeError(" | ".join(erreurs))
# ============================================================
# ACCES BASE
# ============================================================
def get_last_update(cursor, table: str) -> datetime | None:
cursor.execute(f"SELECT MAX(Date) FROM `{table}`")
row = cursor.fetchone()
if not row or row[0] is None:
return None
value = row[0]
if isinstance(value, datetime):
return value
return None
# ============================================================
# TRAITEMENT DES TABLES
# ============================================================
def traiter_table(cursor, table: str, limite: datetime,
defauts_en_cours: list[str],
alertes_envoyees: list[str],
erreurs_sql: list[str]) -> None:
"""
Gère la surveillance d'une table.
"""
if table not in TABLES_SET:
logging.warning(f"Table ignorée (non whitelistée) : {table}")
return
try:
last_update = get_last_update(cursor, table)
except mysql.connector.Error as e:
erreurs_sql.append(table)
logging.error(f"Erreur SQL sur {table} : {e}")
if should_send_alert(table):
try:
envoyer_notifications(
f"⚠️ ALERTE : erreur SQL sur {table}",
f"Erreur SQL détectée sur la table {table}.\n\nDétail :\n{e}"
)
enter_alert_state(table, None)
alertes_envoyees.append(f"{table} (SQL)")
except Exception as notif_e:
logging.error(f"Impossible d'envoyer les notifications SQL pour {table} : {notif_e}")
else:
logging.info(f"{table} : erreur SQL déjà signalée, rappel dans {RAPPEL_HEURES}h.")
return
# Cas défaut : aucune donnée ou donnée trop ancienne
if (last_update is None) or (last_update < limite):
defauts_en_cours.append(table)
if should_send_alert(table):
logging.warning(f"⚠️ {table} en défaut (dernier relevé : {last_update})")
try:
envoyer_notifications(
f"⚠️ ALERTE : {table} absence de relevés",
f"Pas de relevés depuis plus de {DELAI_MINUTES} min.\nDernier relevé : {last_update}"
)
enter_alert_state(table, last_update)
alertes_envoyees.append(f"{table} (dernier : {last_update})")
except Exception as notif_e:
logging.error(f"Erreur envoi notifications alerte pour {table} : {notif_e}")
else:
logging.info(f"{table} déjà signalé, rappel dans {RAPPEL_HEURES}h.")
return
# Cas normal : de nouvelles données sont présentes
was_alerting = is_alerting(table)
previous_state = read_state(table)
previous_last_data = previous_state.get("last_data_at")
current_last_data = dt_to_iso(last_update)
# on mémorise la dernière donnée vue, même en état normal
update_last_data(table, last_update)
# retour à la normale seulement si on sort réellement d'un état d'alerte
# et qu'une donnée plus récente est arrivée
if was_alerting and current_last_data != previous_last_data:
message = f"{table} : relevés à nouveau reçus (dernier : {last_update}). Situation normale."
try:
envoyer_notifications(
f"✅ OK : {table} relevés reçus",
message
)
clear_state(table)
logging.info(f"📩 Retour à la normale envoyé pour {table}.")
except Exception as notif_e:
logging.error(f"Erreur envoi notifications retour à la normale pour {table} : {notif_e}")
else:
logging.info(f"{table} OK (dernier relevé : {last_update})")
# ============================================================
# MAIN
# ============================================================
def main() -> None:
limite = datetime.now() - timedelta(minutes=DELAI_MINUTES)
# pour logs plus clairs
defauts_en_cours: list[str] = [] # défaut détecté (même si déjà signalé)
alertes_envoyees: list[str] = [] # alertes envoyées à CE run (pour mail groupé)
erreurs_sql: list[str] = [] # SQL KO sur tables
defauts_en_cours: list[str] = []
alertes_envoyees: list[str] = []
erreurs_sql: list[str] = []
# 1) Connexion MySQL (une seule fois)
try:
with closing(utils_db.connect_to_mysql()) as cnx, closing(cnx.cursor()) as cursor:
cnx = cast(MySQLConnection, utils_db.connect_to_mysql())
# 2) Surveillance par table
for table in tables:
if table not in TABLES_SET:
logging.warning(f"Table ignorée (non whitelistée) : {table}")
continue
# 2a) Lecture de la dernière date
try:
cursor.execute(f"SELECT MAX(Date) FROM `{table}`")
row = cursor.fetchone()
last_update = row[0] if row else None
except mysql.connector.Error as e:
msg = f"Erreur SQL sur {table} : {e}"
erreurs_sql.append(table)
logging.error(msg)
# Alerte SQL (anti-spam)
if should_send_alert(table):
try:
envoyer_mail(
f"⚠️ ALERTE : erreur SQL sur {table} (voir logs).",
f"Erreur SQL détectée sur la table {table}.\n\nDétail:\n{e}"
)
mark_alert_sent(table)
alertes_envoyees.append(f"{table} (SQL)")
except Exception as mail_e:
logging.error(f"Impossible d'envoyer le mail d'erreur SQL pour {table} : {mail_e}")
else:
logging.info(f"{table} : erreur SQL déjà signalée, rappel dans {RAPPEL_HEURES}h.")
continue
# 2b) Logique métier
if (last_update is None) or (last_update < limite):
defauts_en_cours.append(table)
if should_send_alert(table):
logging.warning(f"⚠️ {table} en défaut (dernier relevé : {last_update})")
# On envoie le mail tout de suite (alerte individuelle)
try:
envoyer_mail(
f"⚠️ ALERTE : {table} absence de relevés",
f"Pas de relevés depuis > {DELAI_MINUTES} min.\nDernier relevé : {last_update}"
)
mark_alert_sent(table)
alertes_envoyees.append(f"{table} (dernier : {last_update})")
except Exception as mail_e:
# pas de mark_alert_sent => on retentera au prochain run
logging.error(f"Erreur envoi mail alerte pour {table} : {mail_e}")
else:
logging.info(f"{table} déjà signalé, rappel dans {RAPPEL_HEURES}h.")
else:
# OK => si on avait un état, on envoie un "retour à la normale"
if os.path.exists(_state_file(table)):
message = f"{table} : relevés à nouveau reçus (dernier : {last_update}). Situation normale."
try:
envoyer_mail(f"✅ OK : {table} relevés reçus", message)
clear_state(table)
logging.info(f"📩 Retour à la normale envoyé pour {table}.")
except Exception as mail_e:
# on conserve le state => on retentera le retour à la normale
logging.error(f"Erreur envoi mail retour à la normale pour {table} : {mail_e}")
else:
logging.info(f"{table} OK (dernier relevé : {last_update})")
with closing(cnx):
cursor = cast(MySQLCursor, cnx.cursor())
with closing(cursor):
for table in TABLES:
traiter_table(
cursor=cursor,
table=table,
limite=limite,
defauts_en_cours=defauts_en_cours,
alertes_envoyees=alertes_envoyees,
erreurs_sql=erreurs_sql,
)
except mysql.connector.Error as e:
logging.error(f"MySQL KO : {e}")
try:
envoyer_mail(
"⚠️ ALERTE : Base MySQL inaccessible (surveillance impossible).",
envoyer_notifications(
"⚠️ ALERTE : Base MySQL inaccessible",
"Connexion MySQL impossible : la surveillance des relevés ne peut pas sexécuter."
)
except Exception as mail_e:
logging.error(f"Impossible d'envoyer le mail MySQL KO : {mail_e}")
except Exception as notif_e:
logging.error(f"Impossible d'envoyer les notifications MySQL KO : {notif_e}")
return
# 3) Logs de synthèse (fidèles à la réalité)
if alertes_envoyees:
logging.info("📧 Mail(s) envoyé(s) : " + ", ".join(alertes_envoyees))
logging.info("📧/💬 Notification(s) envoyée(s) : " + ", ".join(alertes_envoyees))
elif defauts_en_cours or erreurs_sql:
# défauts en cours mais déjà signalés (ou erreurs SQL déjà signalées)
bloc = []
if defauts_en_cours:
bloc.append("défaut(s) relevés: " + ", ".join(defauts_en_cours))
bloc.append("défaut(s) relevés : " + ", ".join(defauts_en_cours))
if erreurs_sql:
bloc.append("erreur(s) SQL: " + ", ".join(erreurs_sql))
logging.info("⚠️ " + " | ".join(bloc) + " (déjà signalé / pas de mail envoyé à ce run)")
bloc.append("erreur(s) SQL : " + ", ".join(erreurs_sql))
logging.info("⚠️ " + " | ".join(bloc) + " (déjà signalé / pas de notification envoyée à ce run)")
else:
logging.info("👍 Tout est OK, aucun Mail envoyé.")
logging.info("👍 Tout est OK, aucune notification envoyée.")
if __name__ == "__main__":

View File

@@ -9,23 +9,26 @@ mkdir -p "$LOG_DIR" "$BACKUP_DIR"
LOG_FILE="$LOG_DIR/backup_$DATE.log"
exec > >(tee -a "$LOG_FILE") 2>&1
# Verrou anti-doublon (facultatif mais prudent)
# Verrou anti-doublon
exec 9>/tmp/backup_mysql.lock
flock -n 9 || { echo "🔒 Un autre backup est en cours. Abandon."; exit 1; }
BACKUP_FILE="$BACKUP_DIR/mysql_backup_$DATE.sql"
# Cible NAS (alias dans ~/.ssh/config)
NAS_HOST="10.8.0.1"
# Cible NAS via WireGuard / LAN
NAS_HOST="192.168.1.250" # à adapter avec l'IP locale réelle du NAS
NAS_PORT="4422" # mettre 4422 seulement si DSM écoute réellement sur 4422 en local
NAS_USER="Michel"
NAS_DIR="/volume1/VPS/Gravelines"
SSH_OPTS="-i /home/debian/.ssh/id_ed25519 -p 4422 \
SSH_KEY="/home/debian/.ssh/id_ed25519"
SSH_OPTS="-i $SSH_KEY -p $NAS_PORT \
-o BatchMode=yes -o PreferredAuthentications=publickey \
-o PasswordAuthentication=no -o PubkeyAuthentication=yes \
-o StrictHostKeyChecking=accept-new -o ConnectTimeout=10 \
-o ServerAliveInterval=30 -o ServerAliveCountMax=2"
# Chemin credentials MySQL (recommandé)
# Credentials MySQL
MYSQL_DEFAULTS="/home/debian/.my.cnf"
# PATH minimal pour cron
@@ -42,7 +45,7 @@ if ! ssh $SSH_OPTS "$NAS_USER@$NAS_HOST" "mkdir -p '$NAS_DIR' && test -w '$NAS_D
exit 20
fi
# 2) Dump MySQL (format .sql, options sûres)
# 2) Dump MySQL
echo "🔷 Dump MySQL…"
if [[ -f "$MYSQL_DEFAULTS" ]]; then
DUMP="mysqldump --defaults-file=$MYSQL_DEFAULTS --all-databases --single-transaction --quick --lock-tables=false --routines --events --triggers"
@@ -50,36 +53,37 @@ else
DUMP="mysqldump --all-databases --single-transaction --quick --lock-tables=false --routines --events --triggers"
fi
# Baisse de priorité pour ne pas gêner la prod
# Baisse de priorité
IONICE="$(command -v ionice >/dev/null 2>&1 && echo 'ionice -c2 -n7' || true)"
NICE="$(command -v nice >/dev/null 2>&1 && echo 'nice -n 10' || true)"
NICE="$(command -v nice >/dev/null 2>&1 && echo 'nice -n 10' || true)"
bash -c "$IONICE $NICE $DUMP > '$BACKUP_FILE'"
# Sanity check
# Vérification locale
if [[ ! -s "$BACKUP_FILE" ]]; then
echo "❌ Fichier de backup vide : $BACKUP_FILE"
exit 21
fi
LOCAL_SIZE=$(stat -c%s "$BACKUP_FILE" 2>/dev/null || wc -c < "$BACKUP_FILE")
echo "✅ Dump OK : $BACKUP_FILE ($LOCAL_SIZE octets)"
# 3) Transfert NAS (SCP à la place de rsync)
# 3) Transfert NAS
scp -O -P 4422 -i /home/debian/.ssh/id_ed25519 \
-o BatchMode=yes -o PreferredAuthentications=publickey -o PasswordAuthentication=no \
-o StrictHostKeyChecking=accept-new \
"$BACKUP_FILE" "Michel@10.8.0.1:$NAS_DIR/"
"$BACKUP_FILE" "$NAS_USER@$NAS_HOST:$NAS_DIR/"
# 4) Vérification taille distante = locale
BASENAME="$(basename "$BACKUP_FILE")"
REMOTE_SIZE=$(ssh -p 4422 -i /home/debian/.ssh/id_ed25519 \
-o BatchMode=yes -o PreferredAuthentications=publickey -o PasswordAuthentication=no \
-o StrictHostKeyChecking=accept-new \
"Michel@10.8.0.1" "wc -c < '$NAS_DIR/$BASENAME'" || echo 0)
"$NAS_USER@$NAS_HOST" "wc -c < '$NAS_DIR/$BASENAME'" || echo 0)
if [[ "$REMOTE_SIZE" != "$LOCAL_SIZE" ]]; then
echo "❌ Taille différente après transfert (local=$LOCAL_SIZE, distant=$REMOTE_SIZE)"
exit 22
fi
echo "✅ Transfert OK → 10.8.0.1:$NAS_DIR/$BASENAME"
echo "✅ Transfert OK → $NAS_HOST:$NAS_DIR/$BASENAME"