#!/usr/bin/env python3 """ Mqtt_Meudon.py (version nettoyée) -------------------------------- - S'abonne à Meudon/# sur le broker MQTT - Parse les messages (topic -> nom de sonde, payload -> température) - Insère les relevés dans MySQL (table `Meudon` par défaut) Variables d'environnement attendues (exemples) : - MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS - DB_HOST, DB_USER, DB_PASS, DB_NAME Optionnelles : - DB_TABLE (défaut: Meudon) - LOG_FILE (défaut: Mqtt_meudon.log) - LOG_LEVEL (défaut: INFO) - DB_POOL_SIZE (défaut: 5) """ from __future__ import annotations import os import sys import time import logging from logging.handlers import RotatingFileHandler from typing import Any, Optional, cast import paho.mqtt.client as mqtt from paho.mqtt.client import CallbackAPIVersion from mysql.connector import pooling from mysql.connector.abstracts import MySQLConnectionAbstract from mysql.connector.cursor import MySQLCursor # ========================= # Configuration (ENV) # ========================= MQTT_HOST = os.getenv("MQTT_HOST", "127.0.0.1") MQTT_PORT = int(os.getenv("MQTT_PORT", "1883")) MQTT_USER = os.getenv("MQTT_USER", "") MQTT_PASS = os.getenv("MQTT_PASS", "") DB_HOST = os.getenv("DB_HOST", "127.0.0.1") DB_USER = os.getenv("DB_USER", "root") DB_PASS = os.getenv("DB_PASS", "") DB_NAME = os.getenv("DB_NAME", "Sondes") DB_TABLE = os.getenv("DB_TABLE", "Meudon") LOG_FILE = os.getenv("LOG_FILE", "Mqtt_meudon.log") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "5")) # ========================= # Logging # ========================= def setup_logging() -> None: """Configure un log propre (rotation) sans dupliquer les handlers.""" logger = logging.getLogger() logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) # Évite la duplication si setup_logging() est rappelé if any(isinstance(h, RotatingFileHandler) for h in logger.handlers): return fmt = logging.Formatter( "%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) file_handler = RotatingFileHandler( LOG_FILE, maxBytes=2_000_000, backupCount=5, encoding="utf-8", ) file_handler.setFormatter(fmt) file_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(fmt) console_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) logger.addHandler(file_handler) logger.addHandler(console_handler) # ========================= # MySQL (pool) # ========================= _db_pool: Optional[pooling.MySQLConnectionPool] = None def init_db_pool() -> None: global _db_pool if _db_pool is not None: return _db_pool = pooling.MySQLConnectionPool( pool_name="meudon_pool", pool_size=DB_POOL_SIZE, host=DB_HOST, user=DB_USER, password=DB_PASS, database=DB_NAME, autocommit=False, ) logging.info("Pool MySQL initialisé (db=%s, size=%s)", DB_NAME, DB_POOL_SIZE) def insert_temperature(sonde: str, temperature: float) -> None: """Insère un relevé dans la table configurée.""" if _db_pool is None: init_db_pool() conn: Optional[MySQLConnectionAbstract] = None cursor: Optional[MySQLCursor] = None try: assert _db_pool is not None # pour le typage conn = cast(MySQLConnectionAbstract, _db_pool.get_connection()) cursor = conn.cursor() # Colonnes supposées : Sonde, Temperature, Date sql = f"INSERT INTO `{DB_TABLE}` (Sonde, Temperature, Date) VALUES (%s, %s, NOW())" cursor.execute(sql, (sonde, temperature)) conn.commit() except Exception as exc: logging.error("Erreur MySQL (insert %s): %s", sonde, exc) # En cas d'erreur, on rollback pour garder la connexion saine try: if conn is not None: conn.rollback() except Exception: pass finally: try: if cursor is not None: cursor.close() finally: if conn is not None: conn.close() # ========================= # MQTT callbacks # ========================= def _topic_to_sonde(topic: str) -> str: """Convertit un topic MQTT en nom de sonde. Exemple : 'Meudon/Chambre1' -> 'Chambre1' """ parts = topic.split("/") if len(parts) >= 2: return parts[-1].strip() return topic.strip() def _payload_to_float(payload: bytes) -> Optional[float]: """Convertit un payload en float (accepte virgule).""" try: s = payload.decode("utf-8", errors="replace").strip() if not s: return None s = s.replace(",", ".") return float(s) except Exception: return None def on_connect( client: mqtt.Client, _userdata: Any, _flags: Any, reason_code: int, properties: Any = None, ) -> None: # `properties` est fourni par l'API v2 ; on le garde pour compatibilité. _ = properties # évite les avertissements "non utilisé" if reason_code == 0: logging.info("Connecté au broker MQTT Meudon (%s:%s)", MQTT_HOST, MQTT_PORT) result, mid = client.subscribe("Meudon/#") logging.info("Abonné au topic : Meudon/# (result=%s, mid=%s)", result, mid) else: logging.error("Échec de connexion MQTT (Meudon), reason_code=%s", reason_code) def on_disconnect( _client: mqtt.Client, _userdata: Any, reason_code: int, properties: Any = None, ) -> None: _ = properties logging.warning("Déconnecté du broker MQTT (reason_code=%s)", reason_code) def on_message(_client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None: topic = (msg.topic or "").strip() sonde = _topic_to_sonde(topic) temp = _payload_to_float(msg.payload) if temp is None: logging.warning("Payload non numérique ignoré (topic=%s, payload=%r)", topic, msg.payload) return logging.info("Reçu: topic=%s -> sonde=%s | température=%s", topic, sonde, temp) insert_temperature(sonde, temp) # ========================= # Main # ========================= def build_mqtt_client() -> mqtt.Client: client = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2) if MQTT_USER: client.username_pw_set(MQTT_USER, MQTT_PASS) client.on_connect = on_connect client.on_disconnect = on_disconnect client.on_message = on_message # Reconnexion automatique client.reconnect_delay_set(min_delay=1, max_delay=30) return client def main() -> None: setup_logging() init_db_pool() client = build_mqtt_client() while True: try: logging.info("Connexion MQTT en cours (%s:%s)...", MQTT_HOST, MQTT_PORT) client.connect(MQTT_HOST, MQTT_PORT, keepalive=60) break except Exception as exc: logging.error("Impossible de se connecter au broker MQTT: %s", exc) time.sleep(5) logging.info("Boucle MQTT Meudon en cours (Ctrl+C pour arrêter)...") try: client.loop_forever() except KeyboardInterrupt: logging.info("Arrêt demandé par l'utilisateur (Meudon)." ) finally: try: client.disconnect() except Exception: pass logging.info("Client MQTT Meudon déconnecté.") if __name__ == "__main__": main()