🆕 Maj du 19/04/2025

This commit is contained in:
2025-04-19 10:31:32 +02:00
parent c460925f9f
commit 0d3c43471a
14 changed files with 1301 additions and 0 deletions

View File

@@ -0,0 +1,36 @@
import requests
import json
# Remplace par ton token de bot Telegram
token = "8128378340:AAF2sO3gaH1XpMNya_pEslzerqokoCiFRGs"
url = f"https://api.telegram.org/bot{token}/getUpdates"
try:
response = requests.get(url)
data = response.json()
print("🔍 Récupération des chats Telegram récents...\n")
if "result" in data:
chats = set()
for update in data["result"]:
message = update.get("message") or update.get("edited_message")
if not message:
continue
chat = message["chat"]
chat_id = chat["id"]
chat_title = chat.get("title") or chat.get("username") or chat.get("first_name") or "Inconnu"
if chat_id not in chats:
chats.add(chat_id)
print(f"➡️ Chat : {chat_title} | ID : {chat_id}")
if not chats:
print("⚠️ Aucun chat trouvé. Envoie un message depuis un groupe ou une discussion avec le bot.")
else:
print("❌ Erreur : aucune donnée 'result' trouvée dans la réponse.")
except Exception as e:
print(f"Erreur lors de la récupération des chats : {e}")

View File

@@ -0,0 +1,30 @@
import paho.mqtt.client as mqttClient
client = mqttClient.Client()
import mysql.connector
import sys
sys.path.insert(0, "/myenv/lib/python3.11.2/site-packages")
# Configuration de la connexion MySQL
mydb = mysql.connector.connect(
host="54.36.188.119",
user="michel",
password="#SO2&1nf%mZ@jfh",
database="Sondes"
)
# Fonction de callback quand un message est reçu
def on_message(_client, _userdata, msg):
print(f"Message reçu sur {msg.topic}: {msg.payload.decode()}")
cursor = mydb.cursor()
frigo_name = msg.topic.split('/')[-1] # Prend la dernière partie après le "/"
sql = "INSERT INTO Meudon (Sonde, Temperature) VALUES (%s, %s)"
val = (frigo_name, msg.payload.decode())
cursor.execute(sql, val)
mydb.commit()
# Configuration du client MQTT
client.username_pw_set("Bwps", "scJ5ACj2keRfI^")
client.on_message = on_message
client.connect("54.36.188.119", 1883, 60)
client.subscribe("Meudon/#") # S'abonner à tous les topics commençant par Saclay
client.loop_forever() # Rester connecté en continu pour écouter les messages

View File

@@ -0,0 +1,30 @@
import paho.mqtt.client as mqttClient
client = mqttClient.Client()
import mysql.connector
import sys
sys.path.insert(0, "/myenv/lib/python3.11.2/site-packages")
# Configuration de la connexion MySQL
mydb = mysql.connector.connect(
host="54.36.188.119",
user="michel",
password="#SO2&1nf%mZ@jfh",
database="Sondes"
)
# Fonction de callback quand un message est reçu
def on_message(_client, _userdata, msg):
print(f"Message reçu sur {msg.topic}: {msg.payload.decode()}")
cursor = mydb.cursor()
frigo_name = msg.topic.split('/')[-1] # Prend la dernière partie après le "/"
sql = "INSERT INTO Saclay (Sonde, Temperature) VALUES (%s, %s)"
val = (frigo_name, msg.payload.decode())
cursor.execute(sql, val)
mydb.commit()
# Configuration du client MQTT
client.username_pw_set("Bwps", "scJ5ACj2keRfI^")
client.on_message = on_message
client.connect("54.36.188.119", 1883, 60)
client.subscribe("Saclay/#") # S'abonner à tous les topics commençant par Saclay
client.loop_forever() # Rester connecté en continu pour écouter les messages

127
Backup_Synology/Monitor.py Normal file
View File

@@ -0,0 +1,127 @@
# Surveillance continue avec envoi d'alertes par email + log CSV
import mysql.connector
from datetime import datetime, timedelta
import time
import smtplib
from email.mime.text import MIMEText
import pandas as pd
# --- Config MySQL ---
config = {
"host": "54.36.188.119",
"user": "michel",
"password": "#SO2&1nf%mZ@jfh",
"database": "Sondes"
}
# --- Destinataires email ---
destinataires = ['services@domo91.fr']
# --- Fonction d'envoi de mail ---
def envoyer_mail(sujet, message, destinataires):
msg = MIMEText(message)
msg['Subject'] = sujet
msg['From'] = 'alertes_saclay@domo91.fr'
msg['To'] = ', '.join(destinataires)
try:
with smtplib.SMTP_SSL('smtp.mail.ovh.net', 465) as server:
server.login('alertes_saclay@domo91.fr', 'Kdpke674y23Feq^H')
server.sendmail(msg['From'], destinataires, msg.as_string())
print(f"📧 Mail envoyé à {destinataires}", flush=True)
except Exception as e:
print(f"Erreur envoi mail : {e}", flush=True)
# --- Fonction de surveillance ---
def surveiller():
log_entries = []
try:
conn = mysql.connector.connect(**config)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT DISTINCT Lieu FROM Chambres_froides")
lieux = [row['Lieu'] for row in cursor.fetchall()]
for lieu in lieux:
table_temp = lieu
table_alertes = f"Alertes_{lieu}"
cursor.execute("SELECT Sonde, Temp_Max FROM Chambres_froides WHERE Lieu=%s AND Etat='ON'", (lieu,))
sondes = cursor.fetchall()
for sonde in sondes:
nom_sonde = sonde['Sonde']
seuil = sonde['Temp_Max']
cursor.execute(f"""
SELECT Date, Temperature FROM {table_temp}
WHERE Sonde = %s
ORDER BY Date DESC LIMIT 6
""", (nom_sonde,))
relevés = cursor.fetchall()
# Log CSV : tous les relevés analysés
for r in relevés:
log_entries.append({
"Date": r['Date'],
"Lieu": lieu,
"Sonde": nom_sonde,
"Température": r['Temperature'],
"Seuil": seuil,
"État": "Dépassement" if r['Temperature'] > seuil else "Normal"
})
if len(relevés) == 6:
toutes_hors_seuil = all(r['Temperature'] > seuil for r in relevés)
plus_ancien = relevés[-1]['Date']
maintenant = datetime.now()
if toutes_hors_seuil and (maintenant - plus_ancien >= timedelta(minutes=30)):
cursor.execute(f"""
SELECT COUNT(*) as total FROM {table_alertes}
WHERE Sonde=%s AND Status='En cours'
""", (nom_sonde,))
en_cours = cursor.fetchone()
if en_cours['total'] == 0:
cursor.execute(
f"INSERT INTO {table_alertes} (Sonde, Debut_defaut, Status) VALUES (%s, NOW(), 'En cours')",
(nom_sonde,)
)
print(f"🚨 Alerte déclenchée pour {nom_sonde} ({lieu})", flush=True)
sujet = f"🚨 ALERTE TEMPÉRATURE - {nom_sonde} ({lieu})"
message = (
f"La sonde '{nom_sonde}' du site '{lieu}' a dépassé le seuil de {seuil}°C "
f"depuis plus de 30 minutes.\nHeure : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
envoyer_mail(sujet, message, destinataires)
# Acquittement automatique
cursor.execute(f"""
SELECT Temperature FROM {table_temp}
WHERE Sonde = %s
ORDER BY Date DESC LIMIT 1
""", (nom_sonde,))
derniere = cursor.fetchone()
if derniere and derniere['Temperature'] <= seuil:
cursor.execute(f"""
UPDATE {table_alertes}
SET Status = 'Acquitté'
WHERE Sonde = %s AND Status = 'En cours'
""", (nom_sonde,))
conn.commit()
cursor.close()
conn.close()
# Enregistrer le log
if log_entries:
df_logs = pd.DataFrame(log_entries)
df_logs.to_csv("/home/debian/travail/Logs/monitor.csv", sep=";", index=False)
except Exception as e:
print(f"Erreur : {e}", flush=True)
# --- Boucle principale ---
while True:
print(f"📡 Vérification à {datetime.now()}", flush=True)
surveiller()
time.sleep(300) # 5 minutes

52
Backup_Synology/README.md Normal file
View File

@@ -0,0 +1,52 @@
# 🌡️ Gestion des sondes domotiques
[![Version Python](https://img.shields.io/badge/Python-3.10-blue)](https://python.org)
[![Licence](https://img.shields.io/badge/Licence-Propriétaire-orange)](https://mj91.fr:448/michel/Gestion_sondes)
[![Gitea](https://img.shields.io/badge/Gitea-auto--hébergé-green)](https://mj91.fr:448)
Application de **surveillance des températures** avec alertes, visualisation Streamlit, et déploiement automatisé via Gitea + Supervisor.
---
## 🧩 Fonctionnalités principales
- 🔍 Lecture de capteurs **DS18B20** et **DHT22**
- 📨 Transmission via **MQTT**
- 📊 Interface **Streamlit** (app.domo91.fr)
- 🔔 Alertes **email / Telegram** si dépassement > 30 minutes
- 🧠 Déploiement auto avec **`deploy.sh`**
- 🧾 Stockage SQL sur **MySQL (VPS)**
---
## 📊 Exemple de visualisation
Voici un aperçu dun graphique dans linterface Streamlit :
![Exemple de graphique Streamlit](https://upload.wikimedia.org/wikipedia/commons/thumb/3/3a/Matplotlib_figure.svg/800px-Matplotlib_figure.svg.png)
---
## 🗂️ Structure du projet
| Fichier | Description |
|-----------------------------|---------------------------------------------------------------------|
| `Monitor.py` | Analyse de température et alertes |
| `Streamlit.py` | Interface graphique web |
| `Cuisine_saclay.py` | Script capteur pour le site de Saclay |
| `Cuisine_meudon.py` | Script capteur pour Meudon |
| `check_supervisor.py` | Vérifie létat des scripts supervisés |
| `deploy.sh` | Déploiement auto depuis Gitea (branche `product`) |
| `requirements.txt` | Dépendances Python |
---
## 🧪 Installation locale
```bash
git clone https://mj91.fr:448/michel/Gestion_sondes.git
cd Gestion_sondes
python -m venv .venv
source .venv/bin/activate # Linux/macOS
.venv\Scripts\activate # Windows
pip install -r requirements.txt

View File

@@ -0,0 +1,107 @@
# Insertion de données da l'app SG.
import streamlit as st
import mysql.connector
from dotenv import load_dotenv
import os
from datetime import datetime
load_dotenv()
st.set_page_config(page_title="Insertion Mysql", layout="centered")
st.title("🗓️ Insertion dans une base MySQL")
# Charger les identifiants
DB_HOST = os.getenv("DB_HOST")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
# Connexion sans DB initiale pour lister les bases
@st.cache_data
def get_databases():
conn = mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD
)
cursor = conn.cursor()
cursor.execute("SHOW DATABASES")
bases = [db[0] for db in cursor.fetchall()]
conn.close()
return bases
# Connexion avec DB sélectionnée
def connect_to_db(db_name):
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=db_name
)
# --- Interface principale ---
base = st.selectbox("Choisir une base de données", get_databases())
if base:
conn = connect_to_db(base)
cursor = conn.cursor()
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
table = st.selectbox("Choisir une table", tables)
if table:
cursor.execute(f"DESCRIBE {table}")
colonnes = cursor.fetchall()
# Détection clé primaire
cursor.execute(f"SHOW INDEX FROM {table} WHERE Key_name = 'PRIMARY'")
pk = cursor.fetchone()
primary_key = pk[4] if pk else None
champ_date_candidates = [col[0] for col in colonnes if "date" in col[1].lower()]
champ_date = st.selectbox("Champ de date", champ_date_candidates)
nb_mois = st.number_input("Nombre d'insertions mensuelles", 1, 36, 6)
date_depart = st.date_input("Date de départ des insertions", value=datetime.today().date())
st.subheader("Champs à insérer (hors clé primaire et champ de date)")
champs_choisis = []
champs_editables = [col for col in colonnes if col[0] != primary_key and col[0] != champ_date]
for col in champs_editables:
if st.checkbox(f"{col[0]} ({col[1]})", value=True):
champs_choisis.append(col[0])
with st.form("formulaire_insertion"):
st.markdown("**Valeurs fixes pour les champs cochés**")
valeurs_fixes = {}
for nom in champs_choisis:
val = st.text_input(nom)
valeurs_fixes[nom] = val if val != "" else None
submit = st.form_submit_button("Insérer")
try:
cursor = conn.cursor()
insert_count = 0
for i in range(nb_mois):
year = date_depart.year + (date_depart.month + i - 1) // 12
month = (date_depart.month + i - 1) % 12 + 1
day = min(date_depart.day, 28)
date_cible = datetime(year, month, day).date()
champs = [champ_date] + list(valeurs_fixes.keys())
valeurs = [date_cible] + list(valeurs_fixes.values())
sql = f"INSERT INTO {table} ({', '.join(champs)}) VALUES ({', '.join(['%s'] * len(valeurs))})"
cursor.execute(sql, valeurs)
insert_count += 1
conn.commit()
st.success(f"{insert_count} lignes insérées avec succès dans `{table}`.")
except Exception as e:
st.error(f"❌ Erreur lors de linsertion : {e}")
conn.commit()
st.success(f"{insert_count} lignes insérées avec succès dans `{table}`.")
st.success(f"{insert_count} lignes insérées avec succès.")

View File

@@ -0,0 +1,24 @@
# Programme de test de la BAL alertes_saclay@domo91.fr.
import smtplib
from email.mime.text import MIMEText
def envoyer_mail(sujet, message, destinataires):
msg = MIMEText(message)
msg['Subject'] = sujet
msg['From'] = 'alertes_saclay@domo91.fr'
msg['To'] = ', '.join(destinataires)
try:
with smtplib.SMTP_SSL('smtp.mail.ovh.net', 465) as server:
server.login('alertes_saclay@domo91.fr', 'Kdpke674y23Feq^H')
server.sendmail(msg['From'], destinataires, msg.as_string())
print(f"📧 Mail envoyé à {destinataires}")
except Exception as e:
print(f"Erreur envoi mail : {e}")
# --- Test d'envoi ---
envoyer_mail(
sujet="🧪 Test d'envoi d'alerte",
message="Ceci est un test de l'envoi d'e-mail via alertes_saclay@domo91.fr.",
destinataires=["services@domo91.fr"]
)

106
Backup_Synology/Tracker.py Normal file
View File

@@ -0,0 +1,106 @@
# Programme de récupération des adresses de sondes DS18B20 et envoie dans table Sondes.Tracker.
import paho.mqtt.client as mqtt
import mysql.connector
import re # Import de la bibliothèque des expressions régulières
# Configuration de la base de données MySQL
db_config = {
'user': 'superviseur', # Remplacez par votre utilisateur MySQL
'password': 'Bto7Lm_z]m!BFH!*', # Remplacez par votre mot de passe MySQL
'host': '54.36.188.119', # Adresse de votre serveur MySQL (localhost si en local)
'database': 'Sondes' # Nom de la base de données MySQL
}
def convertir_rom_id_en_hexa(rom_id):
# On découpe la chaîne rom_id en paires de deux caractères
hex_parts = [f"0x{rom_id[i:i + 2]}" for i in range(0, len(rom_id), 2)]
return ",".join(hex_parts)
# Connexion à la base de données MySQL
def connect_db():
return mysql.connector.connect(**db_config)
# Fonction pour vérifier si la sonde est déjà présente dans la base
def sonde_deja_presente(rom_id):
conn = connect_db()
cursor = conn.cursor()
try:
# Vérifier si la sonde existe déjà dans la base
cursor.execute("SELECT COUNT(*) FROM Tracker WHERE rom_id = %s", (rom_id,))
result = cursor.fetchone()
return result[0] > 0 # Si le nombre est supérieur à 0, la sonde est déjà présente
except mysql.connector.Error as err:
print(f"Erreur MySQL lors de la vérification de la sonde : {err}")
return False
finally:
cursor.close()
conn.close()
# Fonction pour insérer les sondes dans la base de données
def inserer_sonde(rom_id):
if sonde_deja_presente(rom_id):
print(f"Sonde {rom_id} déjà présente dans la base de données. Aucune insertion effectuée.")
return
conn = connect_db()
cursor = conn.cursor()
try:
# Conversion en hexadécimal
rom_id_hexa = convertir_rom_id_en_hexa(rom_id)
# Insertion de rom_id et de sa version hexa dans la base
cursor.execute("INSERT INTO Tracker (rom_id, hexa) VALUES (%s, %s)", (rom_id, rom_id_hexa))
conn.commit()
print(f"Sonde {rom_id} insérée dans la base de données avec son format hexa {rom_id_hexa}.")
except mysql.connector.Error as err:
print(f"Erreur MySQL : {err}")
finally:
cursor.close()
conn.close()
# Fonction de callback pour gérer les messages MQTT
def on_message(_client, _userdata, message):
payload = message.payload.decode()
print(f"Message reçu : {payload}")
# Utilisation de re pour extraire les ROM IDs (Sonde [rom_id])
rom_ids = re.findall(r"[0-9a-fA-F]{16}", payload) # Capturer les IDs de 16 caractères
if rom_ids:
for rom_id in rom_ids:
inserer_sonde(rom_id)
else:
print("Aucun ROM ID trouvé dans le message.")
# Configuration du client MQTT
mqtt_broker = "46.105.92.116"
mqtt_port = 1883 # Vérifiez si votre broker utilise un autre port
mqtt_user = "Bwps"
mqtt_password = "scJ5ACj2keRfI^"
mqtt_topic = "Tracker/sondes"
client = mqtt.Client()
# Authentification MQTT
client.username_pw_set(mqtt_user, mqtt_password)
# Configuration du callback pour les messages MQTT
client.on_message = on_message
# Connexion au broker MQTT
client.connect(mqtt_broker, mqtt_port, 60)
# Souscription au topic Tracker/sondes
client.subscribe(mqtt_topic)
# Démarrer la boucle MQTT
client.loop_forever()

331
Backup_Synology/domo91.py Normal file
View File

@@ -0,0 +1,331 @@
# Application Gestion de sondes
import streamlit as st
import mysql.connector
import pandas as pd
from datetime import date
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from fpdf import FPDF
import os
import random
st.set_page_config(page_title="Domo91 - Surveillance", layout="wide")
st.title("📡 Supervision Températures")
# --- Configuration base de données ---
db_config = {
"host": "54.36.188.119",
"user": "michel",
"password": "#SO2&1nf%mZ@jfh",
"database": "Sondes"
}
# --- Fonction de génération PDF ---
def generer_pdf(site, date_str):
st.info(f"Génération du rapport PDF pour {site} à la date {date_str}")
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute(f"SELECT Sonde, Date, Temperature FROM `{site}` WHERE DATE(Date) = %s ORDER BY Sonde, Date", (date_str,))
rows = cursor.fetchall()
df = pd.DataFrame(rows)
df["Heure"] = pd.to_datetime(df["Date"]).dt.strftime("%H:%M")
releves = {}
for sonde in df["Sonde"].unique():
df_sonde = df[df["Sonde"] == sonde]
releves[sonde] = list(zip(df_sonde["Heure"], df_sonde["Temperature"]))
table_alertes = f"Alertes_{site}"
cursor.execute(f"SELECT Sonde, Debut_defaut, Status FROM {table_alertes} WHERE DATE(Debut_defaut) = %s", (date_str,))
alertes = cursor.fetchall()
cursor.close()
conn.close()
class RapportPDF(FPDF):
def header(self):
self.set_font("Arial", "B", 14)
self.cell(0, 10, "Rapport de surveillance des sondes", ln=1, align="C")
self.set_font("Arial", "", 12)
self.cell(0, 10, f"Date : {date_str}", ln=1, align="C")
self.ln(5)
def site_info(self, site_name):
self.set_font("Arial", "B", 12)
self.cell(0, 10, f"Site : {site_name}", ln=1)
self.ln(2)
def releves_section(self, data):
self.set_font("Arial", "B", 12)
self.cell(0, 10, "Relevés de température", ln=1)
for sonde, mesures in data.items():
self.set_font("Arial", "B", 11)
self.cell(0, 8, f"Sonde : {sonde}", ln=1)
self.set_font("Arial", "", 10)
for heure, temp in mesures:
self.cell(0, 6, f"{heure} - {temp} °C", ln=1)
self.ln(2)
def alertes_section(self, data):
self.set_font("Arial", "B", 12)
self.cell(0, 10, "Alertes enregistrées", ln=1)
self.set_font("Arial", "", 10)
for a in data:
self.cell(0, 6, f"{a['Sonde']} - {a['Debut_defaut']} - {a['Status']}", ln=1)
pdf = RapportPDF()
pdf.add_page()
pdf.site_info(site)
pdf.releves_section(releves)
pdf.alertes_section(alertes)
file_name = f"rapport_{site}_{date_str}.pdf"
output_dir = "PDF"
os.makedirs(output_dir, exist_ok=True) # 🔧 Crée le dossier si absent
output_path = os.path.join(output_dir, file_name)
pdf.output(output_path)
with open(output_path, "rb") as f:
st.download_button(
label="📥 Télécharger le rapport PDF",
data=f,
file_name=file_name,
mime="application/pdf"
)
except Exception as e:
st.error(f"Erreur lors de la génération du PDF : {e}")
# --- Initialisation session ---
if "authenticated" not in st.session_state:
st.session_state["authenticated"] = False
st.session_state["role"] = None
st.session_state["lieu_autorise"] = None
# --- Sidebar (connexion + bouton PDF) ---
with st.sidebar:
st.header("🔐 Connexion")
if not st.session_state.get("authenticated"):
login = st.text_input("Nom d'utilisateur")
password = st.text_input("Mot de passe", type="password")
if st.button("Se connecter"):
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM MotsDePasse WHERE utilisateur = %s", (login,))
result = cursor.fetchone()
if result and result["mot_de_passe"] == password:
st.session_state["authenticated"] = True
st.session_state["role"] = result["role"]
st.session_state["lieu_autorise"] = result["Lieu"]
st.success(f"Connecté comme {result['role']} ({result['Lieu']})")
else:
st.error("Identifiants invalides")
cursor.close()
conn.close()
except Exception as e:
st.error(f"Erreur lors de la connexion à la base : {e}")
else:
st.success(f"Connecté ({st.session_state['role']})")
if st.button("🔓 Déconnexion"):
st.session_state["authenticated"] = False
st.session_state["role"] = None
st.session_state["lieu_autorise"] = None
st.rerun()
st.markdown("---")
st.subheader("📄 Rapport PDF")
if "selected_date" in st.session_state:
if st.button("📥 Télécharger létat du jour (PDF)"):
site = st.session_state["lieu_autorise"]
date_val = st.session_state["selected_date"].strftime("%Y-%m-%d")
generer_pdf(site, date_val)
else:
st.info("Sélectionnez une date pour activer la génération PDF.")
# --- CONTENU PRINCIPAL SI AUTHENTIFIÉ ---
if st.session_state["authenticated"]:
st.markdown("## Sélection du site et de la date")
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
sites_possibles = ["Saclay", "Meudon"]
if st.session_state["role"] == "superviseur":
site_selectionne = st.selectbox("📍 Choisissez un site :", sites_possibles)
else:
site_selectionne = st.session_state["lieu_autorise"]
st.info(f"Site imposé : {site_selectionne}")
selected_date = st.date_input("📅 Date du relevé", value=date.today())
st.session_state["selected_date"] = selected_date
cursor.execute(
f"SELECT * FROM `{site_selectionne}` WHERE DATE(Date) = %s ORDER BY Sonde, Date",
(selected_date.strftime("%Y-%m-%d"),)
)
rows = cursor.fetchall()
if rows:
df = pd.DataFrame(rows)
df["Date"] = pd.to_datetime(df["Date"])
sondes = sorted(df["Sonde"].unique())
sonde_choisie = st.selectbox("🧪 Choisissez une sonde :", sondes)
df_sonde = df[df["Sonde"] == sonde_choisie]
# Ajouter une colonne Heure pour faciliter les filtres
df_sonde.loc[:, "Heure"] = df_sonde["Date"].dt.hour
# Filtrage par tranche horaire
tranche = st.radio("🕒 Tranche horaire :",
["Toute la journée", "Matin (6h-12h)", "Après-midi (12h-18h)", "Nuit (18h-6h)"])
if tranche == "Matin (6h-12h)":
df_sonde = df_sonde[(df_sonde["Heure"] >= 6) & (df_sonde["Heure"] < 12)]
elif tranche == "Après-midi (12h-18h)":
df_sonde = df_sonde[(df_sonde["Heure"] >= 12) & (df_sonde["Heure"] < 18)]
elif tranche == "Nuit (18h-6h)":
df_sonde = df_sonde[(df_sonde["Heure"] >= 18) | (df_sonde["Heure"] < 6)]
df_sonde = df_sonde.copy()
cursor.execute("SELECT Temp_Max FROM Chambres_froides WHERE Lieu = %s AND Sonde = %s", (site_selectionne, sonde_choisie))
seuil = cursor.fetchone()
seuil_temp = seuil["Temp_Max"] if seuil else 10
st.subheader("📊 Tableau des relevés")
df_filtré = df_sonde.copy()
df_filtré = df_filtré.drop(columns="Id", errors="ignore")
def surlignage_temp(val):
try:
if float(val) > seuil_temp:
return "color: red; font-weight: bold"
except:
pass
return ""
# Appliquer le style uniquement à la colonne "Temperature"
styled_df = df_filtré.style.applymap(surlignage_temp, subset=["Temperature"])
st.dataframe(styled_df, use_container_width=True)
st.subheader("📈 Évolution de la température")
fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(df_filtré["Date"], df_filtré["Temperature"], marker='o', label="Température")
ax.axhline(seuil_temp, color='red', linestyle='--', label=f"Seuil {seuil_temp}°C")
ax.set_xlabel("Heure")
ax.set_ylabel("Température (°C)")
ax.set_title(f"{sonde_choisie} - {selected_date.strftime('%d/%m/%Y')}")
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.legend()
st.pyplot(fig)
# --- Affichage automatique des alertes non acquittées ---
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
table_alertes = f"Alertes_{site_selectionne}"
cursor.execute(
f"SELECT Sonde, Debut_defaut, Status FROM `{table_alertes}` WHERE Status != 'Acquitté' ORDER BY Debut_defaut DESC"
)
alertes = cursor.fetchall()
if alertes:
df_alertes = pd.DataFrame(alertes)
st.subheader("🚨 Alertes non acquittées")
st.dataframe(df_alertes, use_container_width=True)
else:
st.success("✅ Aucune alerte en cours.")
cursor.close()
conn.close()
except Exception as e:
st.error(f"Erreur lors de la récupération des alertes : {e}")
except Exception as e:
st.error(f"Erreur MySQL : {e}")
if st.session_state["role"] == "superviseur":
with st.expander(" Ajouter une nouvelle chambre froide", expanded=False):
with st.form("ajout_sonde"):
nouvelle_sonde = st.text_input("Nom de la sonde")
temp_max = st.number_input("Température maximale autorisée (°C)", value=4)
etat_on = st.checkbox("État actif (ON)", value=True)
submitted = st.form_submit_button("✅ Ajouter")
if submitted:
try:
conn_add = mysql.connector.connect(**db_config)
cursor_add = conn_add.cursor()
cursor_add.execute(
"INSERT INTO Chambres_froides (Lieu, Sonde, Temp_Max, Etat) VALUES (%s, %s, %s, %s)",
(site_selectionne, nouvelle_sonde, temp_max, "ON" if etat_on else "OFF")
)
conn_add.commit()
cursor_add.close()
conn_add.close()
st.success(f"Sonde '{nouvelle_sonde}' ajoutée avec succès au site {site_selectionne} 🎉")
# Refresh immédiat
st.session_state["refresh_admin"] = random.randint(1000, 9999)
except Exception as e:
st.error(f"Erreur lors de l'ajout : {e}")
with st.expander("🛠️ Gestion des chambres froides (administrateur)", expanded=True):
# Bouton d'actualisation
if st.button("🔄 Actualiser la liste"):
st.session_state["refresh_admin"] = random.randint(0, 9999) # Force le rerun
try:
conn_admin = mysql.connector.connect(**db_config)
cursor_admin = conn_admin.cursor(dictionary=True)
cursor_admin.execute("SELECT * FROM Chambres_froides WHERE Lieu = %s", (site_selectionne,))
chambres = cursor_admin.fetchall()
if not chambres:
st.warning("Aucune chambre froide pour ce site.")
else:
for chambre in chambres:
col1, col2, col3 = st.columns([3, 1, 2])
with col1:
st.markdown(f"**{chambre['Sonde']}**")
with col2:
etat = st.checkbox("ON", value=(chambre["Etat"] == "ON"),
key=f"etat_{chambre['Id']}_{st.session_state.get('refresh_admin', 0)}")
new_etat = "ON" if etat else "OFF"
with col3:
temp_max = chambre["Temp_Max"]
moins, val, plus = st.columns([1, 2, 1])
with moins:
if st.button("", key=f"moins_{chambre['Id']}"):
temp_max -= 1
with val:
st.markdown(f"<div style='text-align:center;font-size:20px'>{temp_max}°C</div>",
unsafe_allow_html=True)
with plus:
if st.button("", key=f"plus_{chambre['Id']}"):
temp_max += 1
if new_etat != chambre["Etat"] or temp_max != chambre["Temp_Max"]:
cursor_admin.execute(
"UPDATE Chambres_froides SET Etat = %s, Temp_Max = %s WHERE Id = %s",
(new_etat, temp_max, chambre["Id"])
)
conn_admin.commit()
st.success(f"{chambre['Sonde']} mise à jour")
cursor_admin.close()
conn_admin.close()
except Exception as e:
st.error(f"Erreur SQL (admin) : {e}")