diff --git a/app/Mqtt_meudon.py b/app/Mqtt_meudon.py index b12f084..97351bf 100644 --- a/app/Mqtt_meudon.py +++ b/app/Mqtt_meudon.py @@ -1,201 +1,266 @@ #!/usr/bin/env python3 """ -Mqtt_meudon.py -Récupère les mesures MQTT du site Meudon et les insère dans la table Sondes.Meudon. +Mqtt_meudon.py (version nettoyée) +-------------------------------- +- S'abonne à Meudon/# sur le broker MQTT +- Parse les messages (topic -> nom de sonde, payload -> température) +- Insère les relevés dans MySQL (table `Meudon` par défaut) + +Variables d'environnement attendues (exemples) : +- MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS +- DB_HOST, DB_USER, DB_PASS, DB_NAME +Optionnelles : +- DB_TABLE (défaut: Meudon) +- LOG_FILE (défaut: Mqtt_meudon.log) +- LOG_LEVEL (défaut: INFO) +- DB_POOL_SIZE (défaut: 5) """ +from __future__ import annotations + import os +import sys +import time import logging from logging.handlers import RotatingFileHandler -import socket -import mysql.connector -from mysql.connector import Error +from typing import Any, Optional, cast import paho.mqtt.client as mqtt from paho.mqtt.client import CallbackAPIVersion -from dotenv import load_dotenv +from mysql.connector import pooling +from mysql.connector.abstracts import MySQLConnectionAbstract +from mysql.connector.cursor import MySQLCursor + # ========================= -# Chargement du .env +# Configuration (ENV) # ========================= -load_dotenv() -# --- MySQL (commun) --- -DB_HOST = os.getenv("DB_HOST") -DB_USER = os.getenv("DB_USER") -DB_PASS = os.getenv("DB_PASS") -DB_NAME = os.getenv("DB_NAME") - -# --- MQTT --- -MQTT_HOST = os.getenv("MQTT_HOST") -MQTT_USER = os.getenv("MQTT_USER") -MQTT_PASS = os.getenv("MQTT_PASS") +MQTT_HOST = os.getenv("MQTT_HOST", "127.0.0.1") MQTT_PORT = int(os.getenv("MQTT_PORT", "1883")) +MQTT_USER = os.getenv("MQTT_USER", "") +MQTT_PASS = os.getenv("MQTT_PASS", "") -# Client ID (configurable, sinon suffixé avec le hostname) -MQTT_CLIENT_ID = os.getenv( - "MQTT_CLIENT_ID_MEUDON", - f"Mqtt_meudon_{socket.gethostname()}" -) +DB_HOST = os.getenv("DB_HOST", "127.0.0.1") +DB_USER = os.getenv("DB_USER", "root") +DB_PASS = os.getenv("DB_PASS", "") +DB_NAME = os.getenv("DB_NAME", "Sondes") +DB_TABLE = os.getenv("DB_TABLE", "Meudon") -GYRO_TOPIC_MEUDON = os.getenv("GYRO_MQTT_TOPIC_MEUDON", "Meudon/gyrophare") +LOG_FILE = os.getenv("LOG_FILE", "Mqtt_meudon.log") +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() + +DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "5")) -# Nom de la table de destination -TABLE_NAME = "Meudon" # ========================= -# Logging +# Logging # ========================= -def setup_logging(): + +def setup_logging() -> None: + """Configure un log propre (rotation) sans dupliquer les handlers.""" logger = logging.getLogger() - logger.setLevel(logging.INFO) + logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) - formatter = logging.Formatter( - "%(asctime)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" + # Évite la duplication si setup_logging() est rappelé + if any(isinstance(h, RotatingFileHandler) for h in logger.handlers): + return + + fmt = logging.Formatter( + "%(asctime)s | %(levelname)s | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) - # Console - console = logging.StreamHandler() - console.setFormatter(formatter) - logger.addHandler(console) + file_handler = RotatingFileHandler( + LOG_FILE, + maxBytes=2_000_000, + backupCount=5, + encoding="utf-8", + ) + file_handler.setFormatter(fmt) + file_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) - # Logs fichier (même logique que Saclay) - log_dir = os.getenv("LOG_DIR", "./Logs") - try: - os.makedirs(log_dir, exist_ok=True) - file_handler = RotatingFileHandler( - os.path.join(log_dir, "Mqtt_meudon.log"), - maxBytes=1_000_000, - backupCount=5, - encoding="utf-8", - ) - file_handler.setFormatter(formatter) - logger.addHandler(file_handler) - except Exception as e: - logging.warning("Impossible de créer le fichier de log : %s", e) + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(fmt) + console_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) + + logger.addHandler(file_handler) + logger.addHandler(console_handler) # ========================= -# Accès MySQL +# MySQL (pool) # ========================= + +_db_pool: Optional[pooling.MySQLConnectionPool] = None + + +def init_db_pool() -> None: + global _db_pool + if _db_pool is not None: + return + + _db_pool = pooling.MySQLConnectionPool( + pool_name="meudon_pool", + pool_size=DB_POOL_SIZE, + host=DB_HOST, + user=DB_USER, + password=DB_PASS, + database=DB_NAME, + autocommit=False, + ) + logging.info("Pool MySQL initialisé (db=%s, size=%s)", DB_NAME, DB_POOL_SIZE) + + def insert_temperature(sonde: str, temperature: float) -> None: - """ - Insère une mesure dans la table Sondes.Meudon. - La colonne Date utilise CURRENT_TIMESTAMP par défaut dans MySQL. - """ - try: - conn = mysql.connector.connect( - host=DB_HOST, - user=DB_USER, - password=DB_PASS, - database=DB_NAME, - ) + """Insère un relevé dans la table configurée.""" + if _db_pool is None: + init_db_pool() + conn: Optional[MySQLConnectionAbstract] = None + cursor: Optional[MySQLCursor] = None + + try: + assert _db_pool is not None # pour le typage + conn = cast(MySQLConnectionAbstract, _db_pool.get_connection()) cursor = conn.cursor() - sql = f"INSERT INTO {TABLE_NAME} (Sonde, Temperature) VALUES (%s, %s)" + + # Colonnes supposées : Sonde, Temperature, Date + sql = f"INSERT INTO `{DB_TABLE}` (Sonde, Temperature, Date) VALUES (%s, %s, NOW())" cursor.execute(sql, (sonde, temperature)) conn.commit() - logging.info("Insertion OK (Meudon) -> %s = %.2f", sonde, temperature) - - except Error as e: - logging.exception("Erreur MySQL (Meudon) pour la sonde %s : %s", sonde, e) - - finally: + except Exception as exc: + logging.error("Erreur MySQL (insert %s): %s", sonde, exc) + # En cas d'erreur, on rollback pour garder la connexion saine try: - if cursor: - cursor.close() - if conn and conn.is_connected(): - conn.close() + if conn is not None: + conn.rollback() except Exception: pass + finally: + try: + if cursor is not None: + cursor.close() + finally: + if conn is not None: + conn.close() + # ========================= -# Callbacks MQTT (API v2) +# MQTT callbacks # ========================= -def on_connect(client, userdata, flags, reason_code, properties=None): + +def _topic_to_sonde(topic: str) -> str: + """Convertit un topic MQTT en nom de sonde. + Exemple : 'Meudon/Chambre1' -> 'Chambre1' + """ + parts = topic.split("/") + if len(parts) >= 2: + return parts[-1].strip() + return topic.strip() + + +def _payload_to_float(payload: bytes) -> Optional[float]: + """Convertit un payload en float (accepte virgule).""" + try: + s = payload.decode("utf-8", errors="replace").strip() + if not s: + return None + s = s.replace(",", ".") + return float(s) + except Exception: + return None + + +def on_connect( + client: mqtt.Client, + _userdata: Any, + _flags: Any, + reason_code: int, + properties: Any = None, +) -> None: + # `properties` est fourni par l'API v2 ; on le garde pour compatibilité. + _ = properties # évite les avertissements "non utilisé" + if reason_code == 0: - logging.info("Connecté au broker MQTT Meudon (%s)", MQTT_HOST) - # Abonnement à TOUT ce qui commence par "Meudon/" + logging.info("Connecté au broker MQTT Meudon (%s:%s)", MQTT_HOST, MQTT_PORT) result, mid = client.subscribe("Meudon/#") logging.info("Abonné au topic : Meudon/# (result=%s, mid=%s)", result, mid) else: - logging.error("Échec de connexion MQTT (Meudon), code retour = %s", reason_code) + logging.error("Échec de connexion MQTT (Meudon), reason_code=%s", reason_code) -def on_message(client, userdata, msg: mqtt.MQTTMessage): - topic = msg.topic - payload_raw = msg.payload.decode("utf-8", errors="ignore").strip() +def on_disconnect( + _client: mqtt.Client, + _userdata: Any, + reason_code: int, + properties: Any = None, +) -> None: + _ = properties + logging.warning("Déconnecté du broker MQTT (reason_code=%s)", reason_code) - logging.debug("Msg reçu (Meudon) : topic=%s payload=%s", topic, payload_raw) - # On ignore le gyrophare - if topic == GYRO_TOPIC_MEUDON: - logging.debug("Topic gyrophare Meudon ignoré : %s", topic) +def on_message(_client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None: + topic = (msg.topic or "").strip() + sonde = _topic_to_sonde(topic) + temp = _payload_to_float(msg.payload) + + if temp is None: + logging.warning("Payload non numérique ignoré (topic=%s, payload=%r)", topic, msg.payload) return - # Nom de la sonde = dernier segment du topic - sonde = topic.split("/")[-1] if "/" in topic else topic - - # Conversion du payload en float - try: - value = float(payload_raw.replace(",", ".")) - except ValueError: - logging.warning( - "Payload non numérique (Meudon), mesure ignorée (topic=%s, payload=%s)", - topic, - payload_raw, - ) - return - - insert_temperature(sonde, value) + logging.info("Reçu: topic=%s -> sonde=%s | température=%s", topic, sonde, temp) + insert_temperature(sonde, temp) # ========================= -# Programme principal +# Main # ========================= -def main(): - setup_logging() - logging.info("Démarrage du script Mqtt_meudon") - # Vérif minimale des variables d'env MySQL - for var in ["DB_HOST", "DB_USER", "DB_PASS", "DB_NAME"]: - if os.getenv(var) in (None, ""): - logging.error("Variable d'environnement %s manquante !", var) +def build_mqtt_client() -> mqtt.Client: + client = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2) - # Vérif minimale des variables d'env MQTT - if not MQTT_HOST: - logging.error("MQTT_HOST_MEUDON manquant !") - if not MQTT_USER: - logging.warning("MQTT_USER_MEUDON non défini (connexion sans login ?)") - if not MQTT_PORT or MQTT_PORT <= 0: - logging.error("MQTT_PORT_MEUDON invalide : %s", MQTT_PORT) - - client = mqtt.Client( - client_id=MQTT_CLIENT_ID, - callback_api_version=CallbackAPIVersion.VERSION2 - ) - client.username_pw_set(MQTT_USER, MQTT_PASS) + if MQTT_USER: + client.username_pw_set(MQTT_USER, MQTT_PASS) client.on_connect = on_connect + client.on_disconnect = on_disconnect client.on_message = on_message - try: - client.connect(MQTT_HOST, MQTT_PORT, keepalive=60) - except Exception as e: - logging.exception("Impossible de se connecter au broker MQTT Meudon : %s", e) - return + # Reconnexion automatique + client.reconnect_delay_set(min_delay=1, max_delay=30) + + return client + + +def main() -> None: + setup_logging() + init_db_pool() + + client = build_mqtt_client() + + while True: + try: + logging.info("Connexion MQTT en cours (%s:%s)...", MQTT_HOST, MQTT_PORT) + client.connect(MQTT_HOST, MQTT_PORT, keepalive=60) + break + except Exception as exc: + logging.error("Impossible de se connecter au broker MQTT: %s", exc) + time.sleep(5) logging.info("Boucle MQTT Meudon en cours (Ctrl+C pour arrêter)...") try: client.loop_forever() except KeyboardInterrupt: - logging.info("Arrêt demandé par l'utilisateur (Meudon).") + logging.info("Arrêt demandé par l'utilisateur (Meudon)." ) finally: - client.disconnect() + try: + client.disconnect() + except Exception: + pass logging.info("Client MQTT Meudon déconnecté.") diff --git a/app/Mqtt_saclay.py b/app/Mqtt_saclay.py index 212888f..2fe586f 100644 --- a/app/Mqtt_saclay.py +++ b/app/Mqtt_saclay.py @@ -26,9 +26,9 @@ DB_USER = os.getenv("DB_USER") DB_PASS = os.getenv("DB_PASS") DB_NAME = os.getenv("DB_NAME") -MQTT_HOST = "54.36.188.119" -MQTT_USER = "Bwps" -MQTT_PASS = "scJ5ACj2keRfI^" +MQTT_HOST = "162.19.78.131" +MQTT_USER = "sondes" +MQTT_PASS = "3J@bjYP0" MQTT_PORT = int(os.getenv("MQTT_PORT", 1883)) GYRO_TOPIC_SACLAY = os.getenv("GYRO_MQTT_TOPIC_SACLAY", "Saclay/gyrophare") diff --git a/app/surveillance_releves.py b/app/surveillance_releves.py index d52f074..63d9bed 100644 --- a/app/surveillance_releves.py +++ b/app/surveillance_releves.py @@ -5,12 +5,24 @@ from datetime import datetime, timedelta from dotenv import load_dotenv import os import logging - +import sys import mysql.connector # important pour cibler les exceptions MySQL import utils_db from utils_mail import envoyer_mail + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler(sys.stdout)] +) + +# Forcer l'encodage UTF-8 du flux si possible (Windows/PyCharm) +try: + sys.stdout.reconfigure(encoding="utf-8") +except Exception: + pass # -------------------- LOGS -------------------- LOG_DIR = '/home/debian/Gestion_sondes/Logs' os.makedirs(LOG_DIR, exist_ok=True) diff --git a/app/utils_mail.py b/app/utils_mail.py index 41e6fd1..d8ba75f 100644 --- a/app/utils_mail.py +++ b/app/utils_mail.py @@ -10,8 +10,8 @@ load_dotenv('/home/debian/Gestion_sondes/.env') SMTP_HOST = os.getenv("SMTP_HOST", "smtp.mail.ovh.net") SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) -SMTP_LOGIN = os.getenv("SMTP_LOGIN") # ex: services@domo91.fr -SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") # mot de passe OVH +SMTP_LOGIN = os.getenv("SMTP_USER") # ex: services@domo91.fr +SMTP_PASSWORD = os.getenv("SMTP_PASS") # mot de passe OVH MAIL_FROM = os.getenv("MAIL_FROM", SMTP_LOGIN) MAIL_TO = os.getenv("MAIL_TO") # ex: services@domo91.fr