Merge branch 'develop' into product

This commit is contained in:
2025-07-08 12:28:35 +02:00

749
domo91.py
View File

@@ -4,485 +4,150 @@ import mysql.connector
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import matplotlib.dates as mdates import matplotlib.dates as mdates
from fpdf import FPDF
import os import os
import random
from dotenv import load_dotenv from dotenv import load_dotenv
from datetime import datetime, date, time from datetime import datetime, date
import bcrypt import bcrypt
import traceback
# Charger les variables d'environnement # Charger les variables d'environnement
load_dotenv() load_dotenv()
st.set_page_config(page_title="Domo91 - Surveillance", layout="wide") st.set_page_config(page_title="Domo91 - Surveillance", layout="wide")
if "authenticated" not in st.session_state:
st.session_state["authenticated"] = False # Initialisation session state avec valeurs sûres
st.session_state["role"] = None for key, default in {
st.session_state["lieu_autorise"] = None "authenticated": False,
"role": None,
"lieu_autorise": None,
"onglet_actif": "Accueil",
"selected_date": date.today(),
"selected_site": "Saclay",
"selected_periode": "Toute la journée",
}.items():
st.session_state.setdefault(key, default)
st.title("📡 Supervision Températures") st.title("📡 Supervision Températures")
# Configuration MySQL
db_config = { db_config = {
"host": os.getenv("DB_HOST"), "host": os.getenv("DB_HOST"),
"user": os.getenv("DB_USER"), "user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"), "password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME") "database": os.getenv("DB_NAME")
} }
def get_connection(): def get_connection():
return mysql.connector.connect(**db_config) return mysql.connector.connect(**db_config)
# --- Fonction de génération PDF ---
def generer_pdf(site, date_str, periode):
st.info(f"Génération du rapport PDF pour {site} à la date {date_str} ({periode})")
try:
conn = mysql.connector.connect(**db_config)
pdf_cursor = conn.cursor(dictionary=True)
# Requête principale def hash_password(plain_password):
pdf_cursor.execute(f"SELECT Sonde, Date, Temperature FROM `{site}` WHERE DATE(Date) = %s ORDER BY Sonde, Date", (date_str,)) return bcrypt.hashpw(plain_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
rows = pdf_cursor.fetchall()
df = pd.DataFrame(rows)
df["Heure"] = pd.to_datetime(df["Date"]).dt.strftime("%H:%M")
df["Heure_obj"] = pd.to_datetime(df["Date"]).dt.time
# --- Filtrage par période ---
plages = {
"Toute la journée": (time(0, 0), time(23, 59)),
"Matin (6h-12h)": (time(6, 0), time(12, 0)),
"Après-midi (12h-18h)": (time(12, 0), time(18, 0)),
"Nuit (18h-6h)": (time(18, 0), time(6, 0)),
}
heure_debut, heure_fin = plages.get(periode, (time(0, 0), time(23, 59)))
if heure_debut < heure_fin:
df = df[(df["Heure_obj"] >= heure_debut) & (df["Heure_obj"] <= heure_fin)]
else:
df = df[(df["Heure_obj"] >= heure_debut) | (df["Heure_obj"] <= heure_fin)]
# --- Structuration des relevés ---
releves = {}
for sonde in df["Sonde"].unique():
df_sonde = df[df["Sonde"] == sonde]
releves[sonde] = list(zip(df_sonde["Heure"], df_sonde["Temperature"]))
# --- Requête alertes ---
table_alertes = f"Alertes_{site}"
pdf_cursor.execute(f"SELECT Sonde, Debut_defaut, Status FROM {table_alertes} WHERE DATE(Debut_defaut) = %s", (date_str,))
alertes = pdf_cursor.fetchall()
pdf_cursor.close()
conn.close()
# --- Classe PDF ---
class RapportPDF(FPDF):
def header(self):
self.set_font("Arial", "B", 14)
self.cell(0, 10, "Rapport de surveillance des sondes", ln=1, align="C")
self.set_font("Arial", "", 12)
self.cell(0, 10, f"Date : {date_str}", ln=1, align="C")
self.cell(0, 10, f"Periode : {getattr(self, 'periode', '')}", ln=1, align="C")
self.ln(5)
def site_info(self, site_name):
self.set_font("Arial", "B", 12)
self.cell(0, 10, f"Site : {site_name}", ln=1)
self.ln(2)
def releves_section(self, data):
self.set_font("Arial", "B", 12)
self.cell(0, 10, "Relevés de température", ln=1)
for sonde, mesures in data.items():
self.set_font("Arial", "B", 11)
self.cell(0, 8, f"Sonde : {sonde}", ln=1)
col1 = mesures[::2]
col2 = mesures[1::2]
self.set_font("Arial", "B", 10)
self.cell(40, 6, "Heure", border=1)
self.cell(30, 6, "Temp (°C)", border=1)
self.cell(20, 6, "", border=0)
self.cell(40, 6, "Heure", border=1)
self.cell(30, 6, "Temp (°C)", border=1)
self.ln()
self.set_font("Arial", "", 10)
for i in range(max(len(col1), len(col2))):
if i < len(col1):
h1, t1 = col1[i]
self.cell(40, 6, h1, border=1)
self.cell(30, 6, f"{t1:.2f}", border=1)
else:
self.cell(70, 6, "", border=0)
self.cell(20, 6, "", border=0)
if i < len(col2):
h2, t2 = col2[i]
self.cell(40, 6, h2, border=1)
self.cell(30, 6, f"{t2:.2f}", border=1)
self.ln()
self.ln(4)
def alertes_section(self, data):
self.set_font("Arial", "B", 12)
self.cell(0, 10, "Alertes enregistrées", ln=1)
self.set_font("Arial", "", 10)
for a in data:
self.cell(0, 6, f"{a['Sonde']} - {a['Debut_defaut']} - {a['Status']}", ln=1)
# --- Génération du PDF ---
pdf = RapportPDF()
pdf.periode = periode
pdf.add_page()
pdf.site_info(site)
pdf.releves_section(releves)
pdf.alertes_section(alertes)
file_name = f"rapport_{site}_{date_str}.pdf"
output_dir = "PDF"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, file_name)
pdf.output(output_path)
with open(output_path, "rb") as f:
st.download_button(
label="📥 Télécharger le rapport PDF",
data=f,
file_name=file_name,
mime="application/pdf"
)
except Exception as err1:
st.error(f"Erreur lors de la génération du PDF : {err1}")
# --- Initialisation des variables de session --- def verifier_password(input_password, hash_en_base):
if "authenticated" not in st.session_state: return bcrypt.checkpw(input_password.encode('utf-8'), hash_en_base.encode('utf-8'))
st.session_state["authenticated"] = False
if "role" not in st.session_state:
st.session_state["role"] = None
if "lieu_autorise" not in st.session_state:
st.session_state["lieu_autorise"] = None
# --- Connexion utilisateur dans la sidebar ---
st.sidebar.header("🔐 Connexion") # --- Connexion utilisateur ---
if not st.session_state.get("authenticated"): if not st.session_state["authenticated"]:
login = st.sidebar.text_input("Nom d'utilisateur") login = st.sidebar.text_input("Nom d'utilisateur")
password = st.sidebar.text_input("Mot de passe", type="password") password = st.sidebar.text_input("Mot de passe", type="password")
if st.sidebar.button("Se connecter"): if st.sidebar.button("Se connecter"):
try: try:
conn = mysql.connector.connect(**db_config) conn = get_connection()
cursor = conn.cursor(dictionary=True) cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM MotsDePasse WHERE utilisateur = %s", (login,)) cursor.execute("SELECT * FROM MotsDePasse WHERE utilisateur = %s", (login,))
result = cursor.fetchone() result = cursor.fetchone()
def verifier_password(mot_de_passe_saisi, hash_en_base):
return bcrypt.checkpw(mot_de_passe_saisi.encode('utf-8'), hash_en_base.encode('utf-8'))
if result and verifier_password(password, result["mot_de_passe"]): if result and verifier_password(password, result["mot_de_passe"]):
if result["Expiration"] and result["Expiration"] < date.today(): if result["Expiration"] and result["Expiration"] < date.today():
st.sidebar.error("Votre accès a expiré. Veuillez contacter un administrateur.") st.sidebar.error("Accès expiré.")
cursor.close() cursor.close()
conn.close() conn.close()
st.stop() st.stop()
st.session_state["authenticated"] = True st.session_state.update({
st.session_state["role"] = result["role"] "authenticated": True,
st.session_state["lieu_autorise"] = result["Lieu"] "role": result["role"],
st.success(f"Connecté comme {result['role']} ({result['Lieu']})") "lieu_autorise": result["Lieu"]
# Enregistrement de la connexion dans Connexion_Log })
try: now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") cursor.execute("INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion) VALUES (%s, %s, %s)",
cursor.execute(""" (login, result["Lieu"], now_str))
INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion) conn.commit()
VALUES (%s, %s, %s)
""", (login, result["Lieu"], now))
conn.commit()
except Exception as e:
st.warning(f"⚠️ Connexion enregistrée échouée : {e}")
st.rerun() st.rerun()
else: else:
st.sidebar.error("Identifiants invalides") st.sidebar.error("Identifiants invalides")
cursor.close() cursor.close()
conn.close() conn.close()
except Exception as e: except Exception as e:
st.sidebar.error(f"Erreur lors de la connexion à la base : {e}") st.sidebar.error(f"Erreur connexion : {e}")
else: else:
st.sidebar.success(f"Connecté ({st.session_state['role']})") st.sidebar.success(f"Connecté ({st.session_state['role']})")
if st.sidebar.button("🔓 Déconnexion", key="logout_sidebar"): if st.sidebar.button("🔓 Déconnexion"):
st.session_state["authenticated"] = False for key in ["authenticated", "role", "lieu_autorise"]:
st.session_state["role"] = None st.session_state[key] = False if key == "authenticated" else None
st.session_state["lieu_autorise"] = None
st.rerun() st.rerun()
def hash_password(password): # --- Navigation ---
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def ajouter_utilisateur(utilisateur, mot_de_passe, role, lieu, expiration):
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT * FROM MotsDePasse WHERE utilisateur = %s", (utilisateur,))
if cursor.fetchone():
return False, "❌ Utilisateur déjà existant."
hash_mdp = hash_password(mot_de_passe)
cursor.execute("""
INSERT INTO MotsDePasse (utilisateur, mot_de_passe, role, Lieu, Expiration)
VALUES (%s, %s, %s, %s, %s)
""", (utilisateur, hash_mdp, role, lieu, expiration))
conn.commit()
cursor.close()
conn.close()
return True, "✅ Utilisateur ajouté avec succès."
except Exception as e:
return False, f"⚠️ Erreur : {e}"
def afficher_gestion_expiration(conn):
st.subheader("🔐 Gestion des expirations d'accès")
# Récupérer les utilisateurs avec leurs dates d'expiration
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT Id, utilisateur, Expiration FROM MotsDePasse")
users = cursor.fetchall()
cursor.close()
df = pd.DataFrame(users)
df['Expiration'] = pd.to_datetime(df['Expiration']).dt.date
for _, row in df.iterrows():
est_expire = row['Expiration'] < date.today()
fond = "#ffe6e6" if est_expire else "#f8f9fa"
with st.container():
st.markdown(f"<div style='background-color:{fond}; padding:10px; border-radius:8px;'>",
unsafe_allow_html=True)
col1, col2, col3 = st.columns([4, 2, 1])
with col1:
st.markdown(f"**Utilisateur :** {row['utilisateur']}")
if est_expire:
st.markdown("<span style='color:red; font-weight:bold;'>⛔ Accès expiré</span>",
unsafe_allow_html=True)
with col2:
new_date = st.date_input("Expiration", row['Expiration'], key=f"exp_{row['Id']}")
with col3:
if st.button("", key=f"save_{row['Id']}"):
try:
cursor = conn.cursor()
cursor.execute("UPDATE MotsDePasse SET Expiration = %s WHERE Id = %s",
(new_date, row['Id']))
conn.commit()
st.success(f"{row['utilisateur']} mis à jour")
cursor.close()
except Exception as e:
st.error(f"Erreur : {e}")
# 📄 Affichage bouton PDF si une date est choisie
site_pdf = (
st.session_state.get("lieu_autorise")
if st.session_state.get("role") != "superviseur"
else st.session_state.get("selected_site")
)
date_pdf = st.session_state.get("selected_date")
if site_pdf and date_pdf:
st.sidebar.markdown("---")
st.sidebar.subheader("📄 Rapport PDF")
if st.sidebar.button("📥 Télécharger létat du jour (PDF)", key="pdf_btn"):
periode = st.session_state.get("selected_periode", "Toute la journée")
generer_pdf(site_pdf, date, periode)
# --- Forcer une alerte de test dynamique (réservé aux superviseurs)
if st.session_state.get("authenticated") and st.session_state.get("role") == "superviseur":
site_actuel = (
st.session_state.get("lieu_autorise")
if st.session_state.get("role") != "superviseur"
else st.session_state.get("selected_site")
)
if site_actuel:
# Initialiser la détection de changement de site
if "last_site" not in st.session_state:
st.session_state["last_site"] = None
# Si le site a changé, recharger les sondes disponibles
if site_actuel != st.session_state["last_site"]:
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute(f"SELECT DISTINCT Sonde FROM `{site_actuel}` ORDER BY Sonde ASC")
st.session_state["sondes_dispo"] = [row[0] for row in cursor.fetchall()]
cursor.close()
conn.close()
st.session_state["last_site"] = site_actuel
except Exception as e:
st.sidebar.warning(f"Erreur chargement des sondes : {e}")
# Affichage de la liste des sondes disponibles
sondes_dispo = st.session_state.get("sondes_dispo", [])
if sondes_dispo:
st.sidebar.markdown("---")
st.sidebar.subheader("🧪 Test alerte manuelle")
sonde_test = st.sidebar.selectbox("Choisir une sonde :", sondes_dispo)
if st.sidebar.button("🔔 Forcer une alerte de test"):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
table_alertes = f"Alertes_{site_actuel}"
cursor.execute(f"""
INSERT INTO {table_alertes} (Sonde, Debut_defaut, Status)
VALUES (%s, %s, %s)
""", (sonde_test, now, "Test"))
conn.commit()
cursor.close()
conn.close()
st.success(f"Alerte de test créée pour {sonde_test} à {now}")
except Exception as e:
st.error(f"Erreur lors de la création de l'alerte : {e}")
else:
st.success(f"Connecté ({st.session_state['role']})")
if st.button("🔓 Déconnexion", key="logout_main"):
st.session_state["authenticated"] = False
st.session_state["role"] = None
st.session_state["lieu_autorise"] = None
st.rerun()
st.markdown("---")
st.subheader("📄 Rapport PDF")
if "selected_date" in st.session_state:
if st.button("📅 Télécharger l'état du jour (PDF)"):
site = st.session_state["lieu_autorise"]
date_val = st.session_state["selected_date"].strftime("%Y-%m-%d")
generer_pdf(site, date_val, periode)
else:
st.info("Sélectionnez une date pour activer la génération PDF.")
# Récupération des sondes actives
def get_sondes_par_lieu(lieu):
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (lieu,))
sondes = cursor.fetchall()
conn.close()
return sondes
# Mise à jour statut entretien
def maj_entretien(sonde_id, statut, utilisateur):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", (statut, sonde_id))
# Insertion dans Connexion_log
action = "Mise en entretien" if statut else "Sortie d'entretien"
cursor.execute(
"INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion, Action) VALUES (%s, %s, %s, %s)",
(utilisateur, lieu, datetime.now(), action)
)
conn.commit()
conn.close()
# --- CONTENU PRINCIPAL SI AUTHENTIFIÉ ---
if st.session_state["authenticated"]: if st.session_state["authenticated"]:
# --- AFFICHAGE GLOBAL DES ALERTES NON ACQUITTÉES --- onglets = ["Accueil", "Entretien"] if st.session_state["role"] != "superviseur" else ["Accueil", "Statistiques", "Entretien", "Traffic", "Utilisateurs"]
try: onglet_selectionne = st.sidebar.radio("📁 Navigation", onglets, index=onglets.index(st.session_state["onglet_actif"]))
conn = mysql.connector.connect(**db_config) st.session_state["onglet_actif"] = onglet_selectionne
cursor = conn.cursor(dictionary=True)
if "role" not in st.session_state:
st.session_state["role"] = None
if "lieu_autorise" not in st.session_state: site_actuel = st.session_state.get("lieu_autorise") if st.session_state["role"] != "superviseur" else st.session_state.get("selected_site", "Saclay")
st.session_state["lieu_autorise"] = None date_selectionnee = st.session_state.get("selected_date", date.today())
site_selectionne = ( periode_selectionnee = st.session_state.get("selected_periode", "Toute la journée")
st.session_state["lieu_autorise"]
if st.session_state["role"] != "superviseur"
else st.session_state.get("selected_site", "Saclay")
)
table_alertes = f"Alertes_{site_selectionne}" # --- Onglet Accueil ---
cursor.execute( if onglet_selectionne == "Accueil":
f"SELECT Sonde, Debut_defaut, Status FROM `{table_alertes}` WHERE Status != 'Acquitté' ORDER BY Debut_defaut DESC"
)
alertes = cursor.fetchall()
if alertes:
df_alertes = pd.DataFrame(alertes)
st.subheader("🚨 Alertes non acquittées")
st.dataframe(df_alertes, use_container_width=True)
else:
st.success("✅ Aucune alerte en cours.")
cursor.close()
conn.close()
except Exception as e:
st.error(f"Erreur lors de la récupération des alertes : {e}")
# --- NAVIGATION --- (toujours défini, évite les erreurs)
onglet = st.session_state.get("onglet_actif", "Accueil")
if st.session_state.get("authenticated"):
if st.session_state["role"] == "superviseur":
onglets_possibles = ["Accueil", "Statistiques", "Entretien", "Traffic", "Utilisateurs"]
else:
onglets_possibles = ["Accueil", "Entretien"]
onglet = st.sidebar.radio("📁 Navigation", onglets_possibles, index=onglets_possibles.index(onglet))
st.session_state["onglet_actif"] = onglet
# --- ONGLET ACCUEIL ---
if onglet == "Accueil":
st.markdown("## Sélection du site et de la date")
try: try:
conn = mysql.connector.connect(**db_config) conn = get_connection()
cursor = conn.cursor(dictionary=True) cursor = conn.cursor(dictionary=True)
sites_possibles = ["Saclay", "Meudon"]
if st.session_state["role"] == "superviseur": if st.session_state["role"] == "superviseur":
site_selectionne = st.selectbox("📍 Choisissez un site :", sites_possibles) site_actuel = st.selectbox("📍 Choisissez un site :", ["Saclay", "Meudon"], index=0)
st.session_state["selected_site"] = site_selectionne st.session_state["selected_site"] = site_actuel
else: else:
site_selectionne = st.session_state["lieu_autorise"] st.info(f"Site imposé : {site_actuel}")
st.info(f"Site imposé : {site_selectionne}")
selected_date = st.date_input("📅 Date du relevé", value=date.today()) date_selectionnee = st.date_input("📅 Date du relevé", value=date_selectionnee)
st.session_state["selected_date"] = selected_date st.session_state["selected_date"] = date_selectionnee
site_selectionne = st.session_state.get("lieu_autorise") or st.session_state.get("selected_site")
if not site_selectionne: cursor.execute(f"SELECT * FROM `{site_actuel}` WHERE DATE(Date) = %s ORDER BY Sonde, Date DESC",
st.warning("Aucun site sélectionné.") (date_selectionnee.strftime("%Y-%m-%d"),))
st.stop()
cursor.execute(
f"SELECT * FROM `{site_selectionne}` WHERE DATE(Date) = %s ORDER BY Sonde, Date DESC",
(selected_date.strftime("%Y-%m-%d"),)
)
rows = cursor.fetchall() rows = cursor.fetchall()
if rows: if rows:
df = pd.DataFrame(rows) df = pd.DataFrame(rows)
df["Date"] = pd.to_datetime(df["Date"]) df["Date"] = pd.to_datetime(df["Date"])
sondes = sorted(df["Sonde"].unique()) sondes = sorted(df["Sonde"].unique())
sonde_choisie = st.selectbox("🧪 Choisissez une sonde :", sondes, key="selectbox_accueil") sonde_choisie = st.selectbox("🧪 Choisissez une sonde :", sondes)
df_sonde = df[df["Sonde"] == sonde_choisie] df_sonde = df[df["Sonde"] == sonde_choisie].copy()
df_sonde["Heure"] = df_sonde["Date"].dt.hour
df_sonde.loc[:, "Heure"] = df_sonde["Date"].dt.hour tranche = st.radio("🕒 Tranche horaire :", ["Toute la journée", "Matin (6h-12h)", "Après-midi (12h-18h)", "Nuit (18h-6h)"])
tranche = st.radio("🕒 Tranche horaire :",
["Toute la journée", "Matin (6h-12h)", "Après-midi (12h-18h)", "Nuit (18h-6h)"])
st.session_state["selected_periode"] = tranche st.session_state["selected_periode"] = tranche
if tranche == "Matin (6h-12h)": if tranche == "Matin (6h-12h)":
df_sonde = df_sonde[(df_sonde["Heure"] >= 6) & (df_sonde["Heure"] < 12)] df_sonde = df_sonde[(df_sonde["Heure"] >= 6) & (df_sonde["Heure"] < 12)]
elif tranche == "Après-midi (12h-18h)": elif tranche == "Après-midi (12h-18h)":
df_sonde = df_sonde[(df_sonde["Heure"] >= 12) & (df_sonde["Heure"] < 18)] df_sonde = df_sonde[(df_sonde["Heure"] >= 12) & (df_sonde["Heure"] < 18)]
elif tranche == "Nuit (18h-6h)": elif tranche == "Nuit (18h-6h)":
df_sonde = df_sonde[(df_sonde["Heure"] >= 18) | (df_sonde["Heure"] < 6)] df_sonde = df_sonde[(df_sonde["Heure"] >= 18) | (df_sonde["Heure"] < 6)]
df_sonde = df_sonde.copy()
seuil_temp = 10
cursor.execute("SELECT Temp_Max FROM Chambres_froides WHERE Lieu = %s AND Sonde = %s", cursor.execute("SELECT Temp_Max FROM Chambres_froides WHERE Lieu = %s AND Sonde = %s",
(site_selectionne, sonde_choisie)) (site_actuel, sonde_choisie))
seuil = cursor.fetchone() seuil = cursor.fetchone()
seuil_temp = seuil["Temp_Max"] if seuil else 10 if seuil:
seuil_temp = seuil["Temp_Max"]
st.subheader("📊 Tableau des relevés") st.subheader("📊 Tableau des relevés")
df_filtre = df_sonde.copy()
df_filtre = df_filtre.drop(columns="Id", errors="ignore")
def surlignage_temp(val): def surlignage_temp(val):
try: try:
@@ -492,17 +157,16 @@ if st.session_state["authenticated"]:
pass pass
return "" return ""
styled_df = df_sonde.style.applymap(surlignage_temp, subset=["Temperature"])
styled_df = df_filtre.style.applymap(surlignage_temp, subset=["Temperature"])
st.dataframe(styled_df, use_container_width=True) st.dataframe(styled_df, use_container_width=True)
st.subheader("📈 Évolution de la température") st.subheader("📈 Évolution de la température")
fig, ax = plt.subplots(figsize=(10, 4)) fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(df_filtre["Date"], df_filtre["Temperature"], marker='o', label="Température") ax.plot(df_sonde["Date"], df_sonde["Temperature"], marker='o')
ax.axhline(seuil_temp, color='red', linestyle='--', label=f"Seuil {seuil_temp}°C") ax.axhline(seuil_temp, color='red', linestyle='--', label=f"Seuil {seuil_temp}°C")
ax.set_xlabel("Heure") ax.set_xlabel("Heure")
ax.set_ylabel("Température (°C)") ax.set_ylabel("Température (°C)")
ax.set_title(f"{sonde_choisie} - {selected_date.strftime('%d/%m/%Y')}") ax.set_title(f"{sonde_choisie} - {date_selectionnee.strftime('%d/%m/%Y')}")
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M')) ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.legend() ax.legend()
st.pyplot(fig) st.pyplot(fig)
@@ -510,11 +174,13 @@ if st.session_state["authenticated"]:
cursor.close() cursor.close()
conn.close() conn.close()
except Exception as e:
st.error(f"Erreur MySQL : {e}")
# ---- ONGLET STATISTIQUES --- except Exception as e:
elif onglet == "Statistiques": st.error(f"Erreur : {e}")
st.text(traceback.format_exc())
# --- Onglet Statistiques ---
elif onglet_selectionne == "Statistiques":
st.markdown("## 📈 Statistiques de température") st.markdown("## 📈 Statistiques de température")
try: try:
conn = mysql.connector.connect(**db_config) conn = mysql.connector.connect(**db_config)
@@ -558,229 +224,66 @@ if st.session_state["authenticated"]:
except Exception as e: except Exception as e:
st.error(f"Erreur chargement statistiques : {e}") st.error(f"Erreur chargement statistiques : {e}")
# --- Fonctionnalités administrateur : ajout et gestion des chambres froides --- # --- Onglet Entretien ---
if st.session_state["role"] == "superviseur": elif onglet_selectionne == "Entretien":
with st.expander("+ Ajouter une nouvelle chambre froide", expanded=False): st.header("🧰 Gestion Entretien")
with st.form("ajout_sonde"):
nouvelle_sonde = st.text_input("Nom de la nouvelle sonde :")
seuil_max = st.number_input("Température maximale autorisée :", min_value=-50.0, max_value=50.0,
value=6.0, step=0.1)
if st.form_submit_button(" Ajouter la nouvelle sonde"):
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
# Insérer dans Chambres_froides
cursor.execute("""
INSERT INTO Chambres_froides (Lieu, Sonde, Temp_Max, Etat)
VALUES (%s, %s, %s, %s)
""", (site_actuel, nouvelle_sonde, seuil_max, "ON"))
conn.commit()
# Insérer un premier relevé dans la table de relevés
table_releves = site_actuel
cursor.execute(f"""
INSERT INTO `{table_releves}` (Date, Sonde, Temperature)
VALUES (NOW(), %s, %s)
""", (nouvelle_sonde, 0.0))
conn.commit()
cursor.close()
conn.close()
st.success(
f"Sonde {nouvelle_sonde} ajoutée avec Temp_Max {seuil_max}°C et premier relevé enregistré.")
except Exception as e:
st.error(f"Erreur lors de l'ajout de la sonde : {e}")
# --- Affichage automatique des alertes non acquittées ---
site_selectionne = (
st.session_state.get("lieu_autorise")
if st.session_state.get("role") != "superviseur"
else st.session_state.get("selected_site")
)
try: try:
conn = mysql.connector.connect(**db_config) conn = get_connection()
cursor = conn.cursor(dictionary=True) cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (site_actuel,))
table_alertes = f"Alertes_{site_selectionne}"
cursor.close()
conn.close()
except Exception as e:
st.error(f"Erreur lors de la récupération des alertes : {e}")
if st.session_state["role"] == "superviseur":
with st.expander("🛠️ Gestion des chambres froides (administrateur)", expanded=True):
if st.button("🔄 Actualiser la liste"):
st.session_state["refresh_admin"] = random.randint(0, 9999)
try:
conn_admin = mysql.connector.connect(**db_config)
cursor_admin = conn_admin.cursor(dictionary=True)
cursor_admin.execute("SELECT * FROM Chambres_froides WHERE Lieu = %s", (site,))
chambres = cursor_admin.fetchall()
if not chambres:
st.warning("Aucune chambre froide pour ce site.")
else:
for chambre in chambres:
col1, col2, col3 = st.columns([3, 1, 2])
with col1:
st.markdown(f"**{chambre['Sonde']}**")
with col2:
etat = st.checkbox("ON", value=(chambre["Etat"] == "ON"),
key=f"etat_{chambre['Id']}_{st.session_state.get('refresh_admin', 0)}")
new_etat = "ON" if etat else "OFF"
with col3:
temp_max = chambre["Temp_Max"]
moins, val, plus = st.columns([1, 2, 1])
with moins:
if st.button("", key=f"moins_{chambre['Id']}"):
temp_max -= 1
with val:
st.markdown(f"<div style='text-align:center;font-size:20px'>{temp_max}°C</div>",
unsafe_allow_html=True)
with plus:
if st.button("", key=f"plus_{chambre['Id']}"):
temp_max += 1
if new_etat != chambre["Etat"] or temp_max != chambre["Temp_Max"]:
cursor_admin.execute(
"UPDATE Chambres_froides SET Etat = %s, Temp_Max = %s WHERE Id = %s",
(new_etat, temp_max, chambre["Id"])
)
conn_admin.commit()
st.success(f"{chambre['Sonde']} mise à jour")
cursor_admin.close()
conn_admin.close()
except Exception as e:
st.error(f"Erreur SQL (admin) : {e}")
# --- ONGLET ENTRETIEN ---
elif onglet == "Entretien":
st.markdown("## 🧰 Gestion des sondes en entretien")
st.markdown("</div><br>", unsafe_allow_html=True)
role = st.session_state.get("role", "utilisateur")
lieu = st.session_state.get("lieu_autorise") if role != "superviseur" else st.selectbox("Choisir un lieu :",
["Saclay", "Meudon"])
if not lieu:
st.warning("Aucun site sélectionné.")
st.stop()
else:
st.info(f"Site imposé : {lieu}" if role != "superviseur" else "")
def get_sondes_par_lieu(lieu):
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (lieu,))
sondes = cursor.fetchall() sondes = cursor.fetchall()
cursor.close()
conn.close()
return sondes
def maj_entretien(sonde_id, en_entretien, utilisateur, cible):
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", (en_entretien, sonde_id))
cursor.execute(
"INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion, Action, Cible) VALUES (%s, %s, %s, %s, %s)",
(utilisateur, lieu, datetime.now(), f"{'Mise' if en_entretien else 'Retrait'} en entretien",
cible))
conn.commit()
cursor.close()
conn.close()
try:
sondes = get_sondes_par_lieu(lieu)
for sonde in sondes: for sonde in sondes:
checked = st.checkbox(f"{sonde['Sonde']} (ID: {sonde['Id']})", value=sonde['En_entretien'], checked = st.checkbox(f"{sonde['Sonde']}", value=sonde['En_entretien'])
key=f"entretien_{sonde['Id']}")
if checked != sonde['En_entretien']: if checked != sonde['En_entretien']:
maj_entretien(sonde['Id'], checked, st.session_state.get("utilisateur", "Inconnu"), sonde['Sonde']) cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s",
(checked, sonde['Id']))
conn.commit()
st.success(f"{sonde['Sonde']} {'mise' if checked else 'retirée'} en entretien.") st.success(f"{sonde['Sonde']} {'mise' if checked else 'retirée'} en entretien.")
except Exception as e:
st.error(f"Erreur lors du chargement des sondes : {e}")
# --- ONGLET TRAFFIC ------
elif onglet == "Traffic":
st.markdown("## 🚦 Connexions récentes")
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT Utilisateur, Lieu, Date_Connexion FROM Connexion_Log WHERE Lieu IS NOT NULL ORDER BY Date_Connexion DESC LIMIT 200"
)
connexions = cursor.fetchall()
cursor.close() cursor.close()
conn.close() conn.close()
if connexions:
df_connexions = pd.DataFrame(connexions)
st.dataframe(df_connexions, use_container_width=True)
else:
st.info("Aucune connexion enregistrée.")
except Exception as e: except Exception as e:
st.error(f"Erreur chargement des connexions : {e}") st.error(f"Erreur : {e}")
st.text(traceback.format_exc())
# --- ONGLET UTILISATEURS --- # --- Onglet Traffic ---
elif onglet == "Utilisateurs" and st.session_state.get("role") == "superviseur": elif onglet_selectionne == "Traffic":
st.title("👥 Gestion des utilisateurs") st.header("🚦 Connexions récentes")
try:
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT Utilisateur, Lieu, Date_Connexion FROM Connexion_Log ORDER BY Date_Connexion DESC LIMIT 100")
logs = cursor.fetchall()
df_logs = pd.DataFrame(logs)
st.dataframe(df_logs)
cursor.close()
conn.close()
except Exception as e:
st.error(f"Erreur : {e}")
st.text(traceback.format_exc())
# --- Section Ajout --- # --- Onglet Utilisateurs ---
with st.expander(" Ajouter un nouvel utilisateur"): elif onglet_selectionne == "Utilisateurs":
st.header("👥 Gestion des utilisateurs")
with st.form("ajouter_utilisateur"):
new_user = st.text_input("Nom d'utilisateur") new_user = st.text_input("Nom d'utilisateur")
new_pass = st.text_input("Mot de passe", type="password") new_pass = st.text_input("Mot de passe", type="password")
new_role = st.selectbox("Rôle", ["utilisateur", "superviseur"]) new_role = st.selectbox("Rôle", ["utilisateur", "superviseur"])
new_lieu = None new_lieu = st.selectbox("Lieu", ["Saclay", "Meudon", "Roissy"])
if new_role == "utilisateur": if st.form_submit_button("Ajouter"):
new_lieu = st.selectbox("Lieu autorisé", ["Saclay", "Meudon", "Roissy"]) try:
expiration = st.date_input("Date d'expiration (facultative)", value=None) conn = get_connection()
cursor = conn.cursor()
if st.button("Créer l'utilisateur"): hash_mdp = hash_password(new_pass)
if new_user and new_pass: cursor.execute("INSERT INTO MotsDePasse (utilisateur, mot_de_passe, role, Lieu) VALUES (%s, %s, %s, %s)",
success, message = ajouter_utilisateur( (new_user, hash_mdp, new_role, new_lieu))
utilisateur=new_user, conn.commit()
mot_de_passe=new_pass, cursor.close()
role=new_role, conn.close()
lieu=new_lieu if new_role == "utilisateur" else None, st.success("Utilisateur ajouté.")
expiration=expiration if expiration else None except Exception as e:
) st.error(f"Erreur : {e}")
if success: st.text(traceback.format_exc())
st.success(message)
else:
st.error(message)
else:
st.warning("Tous les champs obligatoires doivent être remplis.")
# --- Section Liste ---
st.markdown("### 📋 Utilisateurs existants")
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT utilisateur, role, Lieu, Expiration FROM MotsDePasse ORDER BY utilisateur")
users = cursor.fetchall()
cursor.close()
if users:
df_users = pd.DataFrame(users)
st.dataframe(df_users)
else:
st.info("Aucun utilisateur trouvé.")
# ✅ Appel de la gestion des expirations
afficher_gestion_expiration(conn)
conn.close()
except Exception as e:
st.error(f"Erreur lors du chargement des utilisateurs : {e}")