Modif watchdog relevés
This commit is contained in:
@@ -1,201 +1,266 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Mqtt_meudon.py
|
||||
Récupère les mesures MQTT du site Meudon et les insère dans la table Sondes.Meudon.
|
||||
Mqtt_meudon.py (version nettoyée)
|
||||
--------------------------------
|
||||
- S'abonne à Meudon/# sur le broker MQTT
|
||||
- Parse les messages (topic -> nom de sonde, payload -> température)
|
||||
- Insère les relevés dans MySQL (table `Meudon` par défaut)
|
||||
|
||||
Variables d'environnement attendues (exemples) :
|
||||
- MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS
|
||||
- DB_HOST, DB_USER, DB_PASS, DB_NAME
|
||||
Optionnelles :
|
||||
- DB_TABLE (défaut: Meudon)
|
||||
- LOG_FILE (défaut: Mqtt_meudon.log)
|
||||
- LOG_LEVEL (défaut: INFO)
|
||||
- DB_POOL_SIZE (défaut: 5)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
import socket
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
from typing import Any, Optional, cast
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
from paho.mqtt.client import CallbackAPIVersion
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from mysql.connector import pooling
|
||||
from mysql.connector.abstracts import MySQLConnectionAbstract
|
||||
from mysql.connector.cursor import MySQLCursor
|
||||
|
||||
|
||||
# =========================
|
||||
# Chargement du .env
|
||||
# Configuration (ENV)
|
||||
# =========================
|
||||
load_dotenv()
|
||||
|
||||
# --- MySQL (commun) ---
|
||||
DB_HOST = os.getenv("DB_HOST")
|
||||
DB_USER = os.getenv("DB_USER")
|
||||
DB_PASS = os.getenv("DB_PASS")
|
||||
DB_NAME = os.getenv("DB_NAME")
|
||||
|
||||
# --- MQTT ---
|
||||
MQTT_HOST = os.getenv("MQTT_HOST")
|
||||
MQTT_USER = os.getenv("MQTT_USER")
|
||||
MQTT_PASS = os.getenv("MQTT_PASS")
|
||||
MQTT_HOST = os.getenv("MQTT_HOST", "127.0.0.1")
|
||||
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
|
||||
MQTT_USER = os.getenv("MQTT_USER", "")
|
||||
MQTT_PASS = os.getenv("MQTT_PASS", "")
|
||||
|
||||
# Client ID (configurable, sinon suffixé avec le hostname)
|
||||
MQTT_CLIENT_ID = os.getenv(
|
||||
"MQTT_CLIENT_ID_MEUDON",
|
||||
f"Mqtt_meudon_{socket.gethostname()}"
|
||||
)
|
||||
DB_HOST = os.getenv("DB_HOST", "127.0.0.1")
|
||||
DB_USER = os.getenv("DB_USER", "root")
|
||||
DB_PASS = os.getenv("DB_PASS", "")
|
||||
DB_NAME = os.getenv("DB_NAME", "Sondes")
|
||||
DB_TABLE = os.getenv("DB_TABLE", "Meudon")
|
||||
|
||||
GYRO_TOPIC_MEUDON = os.getenv("GYRO_MQTT_TOPIC_MEUDON", "Meudon/gyrophare")
|
||||
LOG_FILE = os.getenv("LOG_FILE", "Mqtt_meudon.log")
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
|
||||
DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "5"))
|
||||
|
||||
# Nom de la table de destination
|
||||
TABLE_NAME = "Meudon"
|
||||
|
||||
# =========================
|
||||
# Logging
|
||||
# Logging
|
||||
# =========================
|
||||
def setup_logging():
|
||||
|
||||
def setup_logging() -> None:
|
||||
"""Configure un log propre (rotation) sans dupliquer les handlers."""
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.INFO)
|
||||
logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
|
||||
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(levelname)s - %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S"
|
||||
# Évite la duplication si setup_logging() est rappelé
|
||||
if any(isinstance(h, RotatingFileHandler) for h in logger.handlers):
|
||||
return
|
||||
|
||||
fmt = logging.Formatter(
|
||||
"%(asctime)s | %(levelname)s | %(name)s | %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
|
||||
# Console
|
||||
console = logging.StreamHandler()
|
||||
console.setFormatter(formatter)
|
||||
logger.addHandler(console)
|
||||
file_handler = RotatingFileHandler(
|
||||
LOG_FILE,
|
||||
maxBytes=2_000_000,
|
||||
backupCount=5,
|
||||
encoding="utf-8",
|
||||
)
|
||||
file_handler.setFormatter(fmt)
|
||||
file_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
|
||||
|
||||
# Logs fichier (même logique que Saclay)
|
||||
log_dir = os.getenv("LOG_DIR", "./Logs")
|
||||
try:
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
file_handler = RotatingFileHandler(
|
||||
os.path.join(log_dir, "Mqtt_meudon.log"),
|
||||
maxBytes=1_000_000,
|
||||
backupCount=5,
|
||||
encoding="utf-8",
|
||||
)
|
||||
file_handler.setFormatter(formatter)
|
||||
logger.addHandler(file_handler)
|
||||
except Exception as e:
|
||||
logging.warning("Impossible de créer le fichier de log : %s", e)
|
||||
console_handler = logging.StreamHandler(sys.stdout)
|
||||
console_handler.setFormatter(fmt)
|
||||
console_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
|
||||
|
||||
logger.addHandler(file_handler)
|
||||
logger.addHandler(console_handler)
|
||||
|
||||
|
||||
# =========================
|
||||
# Accès MySQL
|
||||
# MySQL (pool)
|
||||
# =========================
|
||||
|
||||
_db_pool: Optional[pooling.MySQLConnectionPool] = None
|
||||
|
||||
|
||||
def init_db_pool() -> None:
|
||||
global _db_pool
|
||||
if _db_pool is not None:
|
||||
return
|
||||
|
||||
_db_pool = pooling.MySQLConnectionPool(
|
||||
pool_name="meudon_pool",
|
||||
pool_size=DB_POOL_SIZE,
|
||||
host=DB_HOST,
|
||||
user=DB_USER,
|
||||
password=DB_PASS,
|
||||
database=DB_NAME,
|
||||
autocommit=False,
|
||||
)
|
||||
logging.info("Pool MySQL initialisé (db=%s, size=%s)", DB_NAME, DB_POOL_SIZE)
|
||||
|
||||
|
||||
def insert_temperature(sonde: str, temperature: float) -> None:
|
||||
"""
|
||||
Insère une mesure dans la table Sondes.Meudon.
|
||||
La colonne Date utilise CURRENT_TIMESTAMP par défaut dans MySQL.
|
||||
"""
|
||||
try:
|
||||
conn = mysql.connector.connect(
|
||||
host=DB_HOST,
|
||||
user=DB_USER,
|
||||
password=DB_PASS,
|
||||
database=DB_NAME,
|
||||
)
|
||||
"""Insère un relevé dans la table configurée."""
|
||||
if _db_pool is None:
|
||||
init_db_pool()
|
||||
|
||||
conn: Optional[MySQLConnectionAbstract] = None
|
||||
cursor: Optional[MySQLCursor] = None
|
||||
|
||||
try:
|
||||
assert _db_pool is not None # pour le typage
|
||||
conn = cast(MySQLConnectionAbstract, _db_pool.get_connection())
|
||||
cursor = conn.cursor()
|
||||
sql = f"INSERT INTO {TABLE_NAME} (Sonde, Temperature) VALUES (%s, %s)"
|
||||
|
||||
# Colonnes supposées : Sonde, Temperature, Date
|
||||
sql = f"INSERT INTO `{DB_TABLE}` (Sonde, Temperature, Date) VALUES (%s, %s, NOW())"
|
||||
cursor.execute(sql, (sonde, temperature))
|
||||
conn.commit()
|
||||
|
||||
logging.info("Insertion OK (Meudon) -> %s = %.2f", sonde, temperature)
|
||||
|
||||
except Error as e:
|
||||
logging.exception("Erreur MySQL (Meudon) pour la sonde %s : %s", sonde, e)
|
||||
|
||||
finally:
|
||||
except Exception as exc:
|
||||
logging.error("Erreur MySQL (insert %s): %s", sonde, exc)
|
||||
# En cas d'erreur, on rollback pour garder la connexion saine
|
||||
try:
|
||||
if cursor:
|
||||
cursor.close()
|
||||
if conn and conn.is_connected():
|
||||
conn.close()
|
||||
if conn is not None:
|
||||
conn.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
finally:
|
||||
try:
|
||||
if cursor is not None:
|
||||
cursor.close()
|
||||
finally:
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
|
||||
|
||||
# =========================
|
||||
# Callbacks MQTT (API v2)
|
||||
# MQTT callbacks
|
||||
# =========================
|
||||
def on_connect(client, userdata, flags, reason_code, properties=None):
|
||||
|
||||
def _topic_to_sonde(topic: str) -> str:
|
||||
"""Convertit un topic MQTT en nom de sonde.
|
||||
Exemple : 'Meudon/Chambre1' -> 'Chambre1'
|
||||
"""
|
||||
parts = topic.split("/")
|
||||
if len(parts) >= 2:
|
||||
return parts[-1].strip()
|
||||
return topic.strip()
|
||||
|
||||
|
||||
def _payload_to_float(payload: bytes) -> Optional[float]:
|
||||
"""Convertit un payload en float (accepte virgule)."""
|
||||
try:
|
||||
s = payload.decode("utf-8", errors="replace").strip()
|
||||
if not s:
|
||||
return None
|
||||
s = s.replace(",", ".")
|
||||
return float(s)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def on_connect(
|
||||
client: mqtt.Client,
|
||||
_userdata: Any,
|
||||
_flags: Any,
|
||||
reason_code: int,
|
||||
properties: Any = None,
|
||||
) -> None:
|
||||
# `properties` est fourni par l'API v2 ; on le garde pour compatibilité.
|
||||
_ = properties # évite les avertissements "non utilisé"
|
||||
|
||||
if reason_code == 0:
|
||||
logging.info("Connecté au broker MQTT Meudon (%s)", MQTT_HOST)
|
||||
# Abonnement à TOUT ce qui commence par "Meudon/"
|
||||
logging.info("Connecté au broker MQTT Meudon (%s:%s)", MQTT_HOST, MQTT_PORT)
|
||||
result, mid = client.subscribe("Meudon/#")
|
||||
logging.info("Abonné au topic : Meudon/# (result=%s, mid=%s)", result, mid)
|
||||
else:
|
||||
logging.error("Échec de connexion MQTT (Meudon), code retour = %s", reason_code)
|
||||
logging.error("Échec de connexion MQTT (Meudon), reason_code=%s", reason_code)
|
||||
|
||||
|
||||
def on_message(client, userdata, msg: mqtt.MQTTMessage):
|
||||
topic = msg.topic
|
||||
payload_raw = msg.payload.decode("utf-8", errors="ignore").strip()
|
||||
def on_disconnect(
|
||||
_client: mqtt.Client,
|
||||
_userdata: Any,
|
||||
reason_code: int,
|
||||
properties: Any = None,
|
||||
) -> None:
|
||||
_ = properties
|
||||
logging.warning("Déconnecté du broker MQTT (reason_code=%s)", reason_code)
|
||||
|
||||
logging.debug("Msg reçu (Meudon) : topic=%s payload=%s", topic, payload_raw)
|
||||
|
||||
# On ignore le gyrophare
|
||||
if topic == GYRO_TOPIC_MEUDON:
|
||||
logging.debug("Topic gyrophare Meudon ignoré : %s", topic)
|
||||
def on_message(_client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None:
|
||||
topic = (msg.topic or "").strip()
|
||||
sonde = _topic_to_sonde(topic)
|
||||
temp = _payload_to_float(msg.payload)
|
||||
|
||||
if temp is None:
|
||||
logging.warning("Payload non numérique ignoré (topic=%s, payload=%r)", topic, msg.payload)
|
||||
return
|
||||
|
||||
# Nom de la sonde = dernier segment du topic
|
||||
sonde = topic.split("/")[-1] if "/" in topic else topic
|
||||
|
||||
# Conversion du payload en float
|
||||
try:
|
||||
value = float(payload_raw.replace(",", "."))
|
||||
except ValueError:
|
||||
logging.warning(
|
||||
"Payload non numérique (Meudon), mesure ignorée (topic=%s, payload=%s)",
|
||||
topic,
|
||||
payload_raw,
|
||||
)
|
||||
return
|
||||
|
||||
insert_temperature(sonde, value)
|
||||
logging.info("Reçu: topic=%s -> sonde=%s | température=%s", topic, sonde, temp)
|
||||
insert_temperature(sonde, temp)
|
||||
|
||||
|
||||
# =========================
|
||||
# Programme principal
|
||||
# Main
|
||||
# =========================
|
||||
def main():
|
||||
setup_logging()
|
||||
logging.info("Démarrage du script Mqtt_meudon")
|
||||
|
||||
# Vérif minimale des variables d'env MySQL
|
||||
for var in ["DB_HOST", "DB_USER", "DB_PASS", "DB_NAME"]:
|
||||
if os.getenv(var) in (None, ""):
|
||||
logging.error("Variable d'environnement %s manquante !", var)
|
||||
def build_mqtt_client() -> mqtt.Client:
|
||||
client = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2)
|
||||
|
||||
# Vérif minimale des variables d'env MQTT
|
||||
if not MQTT_HOST:
|
||||
logging.error("MQTT_HOST_MEUDON manquant !")
|
||||
if not MQTT_USER:
|
||||
logging.warning("MQTT_USER_MEUDON non défini (connexion sans login ?)")
|
||||
if not MQTT_PORT or MQTT_PORT <= 0:
|
||||
logging.error("MQTT_PORT_MEUDON invalide : %s", MQTT_PORT)
|
||||
|
||||
client = mqtt.Client(
|
||||
client_id=MQTT_CLIENT_ID,
|
||||
callback_api_version=CallbackAPIVersion.VERSION2
|
||||
)
|
||||
client.username_pw_set(MQTT_USER, MQTT_PASS)
|
||||
if MQTT_USER:
|
||||
client.username_pw_set(MQTT_USER, MQTT_PASS)
|
||||
|
||||
client.on_connect = on_connect
|
||||
client.on_disconnect = on_disconnect
|
||||
client.on_message = on_message
|
||||
|
||||
try:
|
||||
client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
|
||||
except Exception as e:
|
||||
logging.exception("Impossible de se connecter au broker MQTT Meudon : %s", e)
|
||||
return
|
||||
# Reconnexion automatique
|
||||
client.reconnect_delay_set(min_delay=1, max_delay=30)
|
||||
|
||||
return client
|
||||
|
||||
|
||||
def main() -> None:
|
||||
setup_logging()
|
||||
init_db_pool()
|
||||
|
||||
client = build_mqtt_client()
|
||||
|
||||
while True:
|
||||
try:
|
||||
logging.info("Connexion MQTT en cours (%s:%s)...", MQTT_HOST, MQTT_PORT)
|
||||
client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
|
||||
break
|
||||
except Exception as exc:
|
||||
logging.error("Impossible de se connecter au broker MQTT: %s", exc)
|
||||
time.sleep(5)
|
||||
|
||||
logging.info("Boucle MQTT Meudon en cours (Ctrl+C pour arrêter)...")
|
||||
try:
|
||||
client.loop_forever()
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Arrêt demandé par l'utilisateur (Meudon).")
|
||||
logging.info("Arrêt demandé par l'utilisateur (Meudon)." )
|
||||
finally:
|
||||
client.disconnect()
|
||||
try:
|
||||
client.disconnect()
|
||||
except Exception:
|
||||
pass
|
||||
logging.info("Client MQTT Meudon déconnecté.")
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user