275 lines
7.5 KiB
Python
275 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Mqtt_Meudon.py
|
|
--------------------------------
|
|
- S'abonne à Meudon/# sur le broker MQTT
|
|
- Parse les messages (topic -> nom de sonde, payload -> température)
|
|
- Insère les relevés dans MySQL (table `Meudon` par défaut)
|
|
|
|
Variables d'environnement attendues (exemples) :
|
|
- MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS
|
|
- DB_HOST, DB_USER, DB_PASS, DB_NAME
|
|
Optionnelles :
|
|
- DB_TABLE (défaut: Meudon)
|
|
- LOG_FILE (défaut: Mqtt_meudon.log)
|
|
- LOG_LEVEL (défaut: INFO)
|
|
- DB_POOL_SIZE (défaut: 5)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
from typing import Any, Optional, cast
|
|
|
|
import paho.mqtt.client as mqtt
|
|
from paho.mqtt.client import CallbackAPIVersion
|
|
|
|
from mysql.connector import pooling
|
|
from mysql.connector.abstracts import MySQLConnectionAbstract
|
|
from mysql.connector.cursor import MySQLCursor
|
|
|
|
|
|
# =========================
|
|
# Configuration (ENV)
|
|
# =========================
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
DB_HOST = os.getenv("DB_HOST", "localhost")
|
|
DB_USER = os.getenv("DB_USER")
|
|
DB_PASS = os.getenv("DB_PASS")
|
|
DB_NAME = os.getenv("DB_NAME", "Sondes")
|
|
|
|
DB_TABLE = os.getenv("DB_TABLE", "Meudon")
|
|
DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "5"))
|
|
|
|
MQTT_HOST = os.getenv("MQTT_HOST", "192.168.1.100")
|
|
MQTT_USER = os.getenv("MQTT_USER", "Sondes")
|
|
MQTT_PASS = os.getenv("MQTT_PASS", "3J@bjYP0")
|
|
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
|
|
|
|
GYRO_TOPIC_MEUDON = os.getenv("GYRO_MQTT_TOPIC_MEUDON", "Meudon/gyrophare")
|
|
|
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
|
LOG_FILE = os.getenv(
|
|
"LOG_FILE",
|
|
"/home/domo91/Gestion_sondes/Logs/cuisine_meudon_script.log"
|
|
)
|
|
# =========================
|
|
# Logging
|
|
# =========================
|
|
|
|
def setup_logging() -> None:
|
|
"""Configure un log propre (rotation) sans dupliquer les handlers."""
|
|
logger = logging.getLogger()
|
|
logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
|
|
|
|
# Évite la duplication si setup_logging() est rappelé
|
|
if any(isinstance(h, RotatingFileHandler) for h in logger.handlers):
|
|
return
|
|
|
|
fmt = logging.Formatter(
|
|
"%(asctime)s | %(levelname)s | %(name)s | %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
file_handler = RotatingFileHandler(
|
|
LOG_FILE,
|
|
maxBytes=2_000_000,
|
|
backupCount=5,
|
|
encoding="utf-8",
|
|
)
|
|
file_handler.setFormatter(fmt)
|
|
file_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
|
|
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setFormatter(fmt)
|
|
console_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
|
|
|
|
logger.addHandler(file_handler)
|
|
logger.addHandler(console_handler)
|
|
|
|
|
|
# =========================
|
|
# MySQL (pool)
|
|
# =========================
|
|
|
|
_db_pool: Optional[pooling.MySQLConnectionPool] = None
|
|
|
|
|
|
def init_db_pool() -> None:
|
|
global _db_pool
|
|
if _db_pool is not None:
|
|
return
|
|
|
|
_db_pool = pooling.MySQLConnectionPool(
|
|
pool_name="meudon_pool",
|
|
pool_size=DB_POOL_SIZE,
|
|
host=DB_HOST,
|
|
user=DB_USER,
|
|
password=DB_PASS,
|
|
database=DB_NAME,
|
|
autocommit=False,
|
|
)
|
|
logging.info("Pool MySQL initialisé (db=%s, size=%s)", DB_NAME, DB_POOL_SIZE)
|
|
|
|
|
|
def insert_temperature(sonde: str, temperature: float) -> None:
|
|
"""Insère un relevé dans la table configurée."""
|
|
if _db_pool is None:
|
|
init_db_pool()
|
|
|
|
conn: Optional[MySQLConnectionAbstract] = None
|
|
cursor: Optional[MySQLCursor] = None
|
|
|
|
try:
|
|
assert _db_pool is not None # pour le typage
|
|
conn = cast(MySQLConnectionAbstract, _db_pool.get_connection())
|
|
cursor = conn.cursor()
|
|
|
|
# Colonnes supposées : Sonde, Temperature, Date
|
|
sql = f"INSERT INTO `{DB_TABLE}` (Sonde, Temperature, Date) VALUES (%s, %s, NOW())"
|
|
cursor.execute(sql, (sonde, temperature))
|
|
conn.commit()
|
|
|
|
except Exception as exc:
|
|
logging.error("Erreur MySQL (insert %s): %s", sonde, exc)
|
|
# En cas d'erreur, on rollback pour garder la connexion saine
|
|
try:
|
|
if conn is not None:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
|
|
finally:
|
|
try:
|
|
if cursor is not None:
|
|
cursor.close()
|
|
finally:
|
|
if conn is not None:
|
|
conn.close()
|
|
|
|
|
|
# =========================
|
|
# MQTT callbacks
|
|
# =========================
|
|
|
|
def _topic_to_sonde(topic: str) -> str:
|
|
"""Convertit un topic MQTT en nom de sonde.
|
|
Exemple : 'Meudon/Chambre1' -> 'Chambre1'
|
|
"""
|
|
parts = topic.split("/")
|
|
if len(parts) >= 2:
|
|
return parts[-1].strip()
|
|
return topic.strip()
|
|
|
|
|
|
def _payload_to_float(payload: bytes) -> Optional[float]:
|
|
"""Convertit un payload en float (accepte virgule)."""
|
|
try:
|
|
s = payload.decode("utf-8", errors="replace").strip()
|
|
if not s:
|
|
return None
|
|
s = s.replace(",", ".")
|
|
return float(s)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def on_connect(
|
|
client: mqtt.Client,
|
|
_userdata: Any,
|
|
_flags: Any,
|
|
reason_code: int,
|
|
properties: Any = None,
|
|
) -> None:
|
|
# `properties` est fourni par l'API v2 ; on le garde pour compatibilité.
|
|
_ = properties # évite les avertissements "non utilisé"
|
|
|
|
if reason_code == 0:
|
|
logging.info("Connecté au broker MQTT Meudon (%s:%s)", MQTT_HOST, MQTT_PORT)
|
|
result, mid = client.subscribe("Meudon/#")
|
|
logging.info("Abonné au topic : Meudon/# (result=%s, mid=%s)", result, mid)
|
|
else:
|
|
logging.error("Échec de connexion MQTT (Meudon), reason_code=%s", reason_code)
|
|
|
|
|
|
def on_disconnect(
|
|
_client: mqtt.Client,
|
|
_userdata: Any,
|
|
reason_code: int,
|
|
properties: Any = None,
|
|
) -> None:
|
|
_ = properties
|
|
logging.warning("Déconnecté du broker MQTT (reason_code=%s)", reason_code)
|
|
|
|
|
|
def on_message(_client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None:
|
|
topic = (msg.topic or "").strip()
|
|
sonde = _topic_to_sonde(topic)
|
|
temp = _payload_to_float(msg.payload)
|
|
|
|
if temp is None:
|
|
logging.warning("Payload non numérique ignoré (topic=%s, payload=%r)", topic, msg.payload)
|
|
return
|
|
|
|
logging.info("Reçu: topic=%s -> sonde=%s | température=%s", topic, sonde, temp)
|
|
insert_temperature(sonde, temp)
|
|
|
|
|
|
# =========================
|
|
# Main
|
|
# =========================
|
|
|
|
def build_mqtt_client() -> mqtt.Client:
|
|
client = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2)
|
|
|
|
if MQTT_USER:
|
|
client.username_pw_set(MQTT_USER, MQTT_PASS)
|
|
|
|
client.on_connect = on_connect
|
|
client.on_disconnect = on_disconnect
|
|
client.on_message = on_message
|
|
|
|
# Reconnexion automatique
|
|
client.reconnect_delay_set(min_delay=1, max_delay=30)
|
|
|
|
return client
|
|
|
|
|
|
def main() -> None:
|
|
setup_logging()
|
|
init_db_pool()
|
|
|
|
client = build_mqtt_client()
|
|
|
|
while True:
|
|
try:
|
|
logging.info("Connexion MQTT en cours (%s:%s)...", MQTT_HOST, MQTT_PORT)
|
|
client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
|
|
break
|
|
except Exception as exc:
|
|
logging.error("Impossible de se connecter au broker MQTT: %s", exc)
|
|
time.sleep(5)
|
|
|
|
logging.info("Boucle MQTT Meudon en cours (Ctrl+C pour arrêter)...")
|
|
try:
|
|
client.loop_forever()
|
|
except KeyboardInterrupt:
|
|
logging.info("Arrêt demandé par l'utilisateur (Meudon)." )
|
|
finally:
|
|
try:
|
|
client.disconnect()
|
|
except Exception:
|
|
pass
|
|
logging.info("Client MQTT Meudon déconnecté.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|