Files
Gestion_sondes/app/surveillance_releves.py
2026-05-22 13:36:00 +02:00

508 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Surveillance des réceptions de données dans les tables (par site)
+ alerte mail
+ alerte Synology Chat
+ retour à la normale
"""
from __future__ import annotations
from datetime import datetime, timedelta
from pathlib import Path
import json
import logging
import os
import sys
import re
import mysql.connector
from contextlib import closing
from typing import cast
from mysql.connector.connection import MySQLConnection
from mysql.connector.cursor import MySQLCursor
import requests
from dotenv import load_dotenv
import utils_db
from utils_mail import envoyer_mail
# ============================================================
# PATHS / ENV
# ============================================================
APP_DIR = Path(__file__).resolve().parent # .../Gestion_sondes/app
ROOT_DIR = APP_DIR.parent # .../Gestion_sondes
ENV_PATH = ROOT_DIR / ".env"
LOG_DIR = ROOT_DIR / "Logs"
LOG_DIR.mkdir(parents=True, exist_ok=True)
# Etat persistant dans le projet (évite les surprises de /tmp)
STATE_DIR = APP_DIR / "state"
STATE_DIR.mkdir(parents=True, exist_ok=True)
load_dotenv(ENV_PATH, override=True)
# ============================================================
# CONFIGURATION
# ============================================================
# Table de configuration : seuls les sites avec Actif = 'ON' seront surveillés.
TABLE_SITES_SURVEILLANCE = "Sites_Surveillance"
# Sécurité / secours : utilisé uniquement si la table Sites_Surveillance est absente
# ou si elle ne retourne aucun site actif.
TABLES_FALLBACK = ["Saclay", "Meudon"]
DELAI_MINUTES = 15
RAPPEL_HEURES = 6
def _env_str(name: str, default: str = "") -> str:
return (os.getenv(name, default) or "").strip()
def _env_bool(name: str, default: bool) -> bool:
value = _env_str(name, "1" if default else "0").lower()
return value in ("1", "true", "yes", "on")
# ============================================================
# LOGGING
# ============================================================
log_filename = LOG_DIR / f"surveillance_{datetime.now():%Y-%m-%d}.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_filename, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
force=True,
)
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.StreamHandler):
try:
handler.stream.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
# ============================================================
# ETAT DES ALERTES
# ============================================================
def state_file(site: str) -> Path:
return STATE_DIR / f"{site}.json"
def read_state(site: str) -> dict:
sf = state_file(site)
if not sf.exists():
return {
"status": "ok", # ok | alerting
"first_alert_at": None,
"last_alert_at": None,
"last_data_at": None,
}
try:
data = json.loads(sf.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("format JSON invalide")
return {
"status": data.get("status", "ok"),
"first_alert_at": data.get("first_alert_at"),
"last_alert_at": data.get("last_alert_at"),
"last_data_at": data.get("last_data_at"),
}
except Exception as e:
logging.warning(f"Etat corrompu pour {site} ({sf}) : {e}. Etat réinitialisé.")
return {
"status": "ok",
"first_alert_at": None,
"last_alert_at": None,
"last_data_at": None,
}
def write_state(site: str, state: dict) -> None:
sf = state_file(site)
try:
sf.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
except OSError as e:
logging.warning(f"Impossible d'écrire l'état {sf} : {e}")
def dt_to_iso(value) -> str | None:
if value is None:
return None
if isinstance(value, datetime):
return value.isoformat()
return str(value)
def iso_to_dt(value: str | None) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(value)
except ValueError:
return None
def enter_alert_state(site: str, last_update) -> None:
state = read_state(site)
now = datetime.now()
state["status"] = "alerting"
state["first_alert_at"] = state["first_alert_at"] or now.isoformat()
state["last_alert_at"] = now.isoformat()
state["last_data_at"] = dt_to_iso(last_update)
write_state(site, state)
def update_last_data(site: str, last_update) -> None:
state = read_state(site)
state["last_data_at"] = dt_to_iso(last_update)
write_state(site, state)
def clear_state(site: str) -> None:
write_state(site, {
"status": "ok",
"first_alert_at": None,
"last_alert_at": None,
"last_data_at": None,
})
def is_alerting(site: str) -> bool:
return read_state(site).get("status") == "alerting"
def should_send_alert(site: str) -> bool:
"""
Règle :
- 1ère alerte dès que l'absence de données dépasse DELAI_MINUTES
- puis 1 rappel toutes les RAPPEL_HEURES tant que le défaut persiste
"""
state = read_state(site)
if state.get("status") != "alerting":
return True
last_alert = iso_to_dt(state.get("last_alert_at"))
if last_alert is None:
return True
return (datetime.now() - last_alert) >= timedelta(hours=RAPPEL_HEURES)
# ============================================================
# NOTIFICATIONS
# ============================================================
def envoyer_chat(site: str, titre: str, message: str) -> None:
webhook = (
_env_str(f"SYNO_CHAT_WEBHOOK_MONITOR_{site}") or
_env_str(f"SYNO_CHAT_WEBHOOK_MONITOR_{site.upper()}") or
_env_str("SYNO_CHAT_WEBHOOK_MONITOR") or
_env_str(f"SYNO_CHAT_WEBHOOK_{site}") or
_env_str(f"SYNO_CHAT_WEBHOOK_{site.upper()}") or
_env_str("SYNO_CHAT_WEBHOOK")
)
if not webhook:
logging.warning(f"Webhook Synology Chat monitor non configuré pour {site} : notification Chat ignorée.")
return
verify_ssl = _env_bool("SYNO_CHAT_VERIFY_SSL", True)
botname = _env_str("SYNO_CHAT_BOTNAME_MONITOR", "Injection données dans tables")
texte = f"{titre}\n{message}"
payload: dict[str, str] = {"text": texte}
if botname:
payload["username"] = botname
response = requests.post(
webhook,
data={"payload": json.dumps(payload, ensure_ascii=False)},
timeout=int(_env_str("SYNO_CHAT_TIMEOUT", "10")),
verify=verify_ssl,
)
response.raise_for_status()
try:
rep_json = response.json()
if isinstance(rep_json, dict) and rep_json.get("success") is False:
raise RuntimeError(f"Synology Chat a refusé le message : {rep_json}")
except ValueError:
pass
logging.info("💬 Notification Synology Chat envoyée pour %s.", site)
def envoyer_notifications(site: str, sujet: str, message: str) -> None:
"""
Envoie mail + chat.
Lève une erreur si au moins un des deux canaux échoue.
"""
erreurs: list[str] = []
try:
envoyer_mail(sujet, message)
logging.info("📧 Mail envoyé.")
except Exception as e:
erreurs.append(f"mail: {e}")
try:
envoyer_chat(site, sujet, message)
except Exception as e:
erreurs.append(f"chat: {e}")
if erreurs:
raise RuntimeError(" | ".join(erreurs))
# ============================================================
# ACCES BASE
# ============================================================
def is_safe_identifier(name: str) -> bool:
"""
Autorise uniquement les noms simples de tables/sites :
lettres, chiffres et underscore.
Cela évite toute injection SQL via un nom de table dynamique.
"""
return bool(re.fullmatch(r"[A-Za-z0-9_]+", name or ""))
def quote_identifier(name: str) -> str:
if not is_safe_identifier(name):
raise ValueError(f"Nom de table/site invalide : {name!r}")
return f"`{name}`"
def get_last_update(cursor, table: str) -> datetime | None:
cursor.execute(f"SELECT MAX(Date) FROM {quote_identifier(table)}")
row = cursor.fetchone()
if not row or row[0] is None:
return None
value = row[0]
if isinstance(value, datetime):
return value
return None
def get_tables_a_surveiller(cursor) -> list[str]:
"""
Lit la table Sites_Surveillance.
Seuls les sites Actif = 'ON' sont surveillés.
Les sites Actif = 'OFF' sont journalisés mais ignorés.
"""
try:
cursor.execute(f"""
SELECT Lieu, Actif, Commentaire
FROM {quote_identifier(TABLE_SITES_SURVEILLANCE)}
ORDER BY Lieu
""")
rows = cursor.fetchall()
except mysql.connector.Error as e:
logging.error(
"Impossible de lire %s : %s. Utilisation du fallback : %s",
TABLE_SITES_SURVEILLANCE,
e,
", ".join(TABLES_FALLBACK),
)
return TABLES_FALLBACK.copy()
tables_actives: list[str] = []
for lieu, actif, commentaire in rows:
lieu = str(lieu).strip()
actif = str(actif).strip().upper()
if not is_safe_identifier(lieu):
logging.warning("Site ignoré dans %s : nom invalide %r", TABLE_SITES_SURVEILLANCE, lieu)
continue
if actif == "ON":
tables_actives.append(lieu)
else:
if commentaire:
logging.info("⏸️ %s ignoré : surveillance OFF (%s)", lieu, commentaire)
else:
logging.info("⏸️ %s ignoré : surveillance OFF", lieu)
if not tables_actives:
logging.warning(
"Aucun site actif dans %s. Utilisation du fallback : %s",
TABLE_SITES_SURVEILLANCE,
", ".join(TABLES_FALLBACK),
)
return TABLES_FALLBACK.copy()
logging.info("Sites surveillés : %s", ", ".join(tables_actives))
return tables_actives
# ============================================================
# TRAITEMENT DES TABLES
# ============================================================
def traiter_table(cursor, table: str, limite: datetime,
tables_autorisees: set[str],
defauts_en_cours: list[str],
alertes_envoyees: list[str],
erreurs_sql: list[str]) -> None:
"""
Gère la surveillance d'une table.
"""
if table not in tables_autorisees:
logging.warning(f"Table ignorée (non autorisée) : {table}")
return
if not is_safe_identifier(table):
logging.warning(f"Table ignorée (nom invalide) : {table}")
return
try:
last_update = get_last_update(cursor, table)
except mysql.connector.Error as e:
erreurs_sql.append(table)
logging.error(f"Erreur SQL sur {table} : {e}")
if should_send_alert(table):
try:
envoyer_notifications(
table,
f"⚠️ ALERTE : erreur SQL sur {table}",
f"Erreur SQL détectée sur la table {table}.\n\nDétail :\n{e}"
)
enter_alert_state(table, None)
alertes_envoyees.append(f"{table} (SQL)")
except Exception as notif_e:
logging.error(f"Impossible d'envoyer les notifications SQL pour {table} : {notif_e}")
else:
logging.info(f"{table} : erreur SQL déjà signalée, rappel dans {RAPPEL_HEURES}h.")
return
# Cas défaut : aucune donnée ou donnée trop ancienne
if (last_update is None) or (last_update < limite):
defauts_en_cours.append(table)
if should_send_alert(table):
logging.warning(f"⚠️ {table} en défaut (dernier relevé : {last_update})")
try:
envoyer_notifications(
table,
f"⚠️ ALERTE : {table} absence de relevés",
f"Pas de relevés depuis plus de {DELAI_MINUTES} min.\nDernier relevé : {last_update}"
)
enter_alert_state(table, last_update)
alertes_envoyees.append(f"{table} (dernier : {last_update})")
except Exception as notif_e:
logging.error(f"Erreur envoi notifications alerte pour {table} : {notif_e}")
else:
logging.info(f"{table} déjà signalé, rappel dans {RAPPEL_HEURES}h.")
return
# Cas normal : de nouvelles données sont présentes
was_alerting = is_alerting(table)
previous_state = read_state(table)
previous_last_data = previous_state.get("last_data_at")
current_last_data = dt_to_iso(last_update)
# on mémorise la dernière donnée vue, même en état normal
update_last_data(table, last_update)
# retour à la normale seulement si on sort réellement d'un état d'alerte
# et qu'une donnée plus récente est arrivée
if was_alerting and current_last_data != previous_last_data:
message = f"{table} : relevés à nouveau reçus (dernier : {last_update}). Situation normale."
try:
envoyer_notifications(
table,
f"✅ OK : {table} relevés reçus",
message
)
clear_state(table)
logging.info(f"📩 Retour à la normale envoyé pour {table}.")
except Exception as notif_e:
logging.error(f"Erreur envoi notifications retour à la normale pour {table} : {notif_e}")
else:
logging.info(f"{table} OK (dernier relevé : {last_update})")
# ============================================================
# MAIN
# ============================================================
def main() -> None:
limite = datetime.now() - timedelta(minutes=DELAI_MINUTES)
defauts_en_cours: list[str] = []
alertes_envoyees: list[str] = []
erreurs_sql: list[str] = []
try:
cnx = cast(MySQLConnection, utils_db.connect_to_mysql())
with closing(cnx):
cursor = cast(MySQLCursor, cnx.cursor())
with closing(cursor):
tables_a_surveiller = get_tables_a_surveiller(cursor)
tables_autorisees = set(tables_a_surveiller)
for table in tables_a_surveiller:
traiter_table(
cursor=cursor,
table=table,
limite=limite,
tables_autorisees=tables_autorisees,
defauts_en_cours=defauts_en_cours,
alertes_envoyees=alertes_envoyees,
erreurs_sql=erreurs_sql,
)
except mysql.connector.Error as e:
logging.error(f"MySQL KO : {e}")
try:
envoyer_notifications(
"GLOBAL",
"⚠️ ALERTE : Base MySQL inaccessible",
"Connexion MySQL impossible : la surveillance des relevés ne peut pas sexécuter."
)
except Exception as notif_e:
logging.error(f"Impossible d'envoyer les notifications MySQL KO : {notif_e}")
return
if alertes_envoyees:
logging.info("📧/💬 Notification(s) envoyée(s) : " + ", ".join(alertes_envoyees))
elif defauts_en_cours or erreurs_sql:
bloc = []
if defauts_en_cours:
bloc.append("défaut(s) relevés : " + ", ".join(defauts_en_cours))
if erreurs_sql:
bloc.append("erreur(s) SQL : " + ", ".join(erreurs_sql))
logging.info("⚠️ " + " | ".join(bloc) + " (déjà signalé / pas de notification envoyée à ce run)")
else:
logging.info("👍 Tout est OK, aucune notification envoyée.")
if __name__ == "__main__":
main()