diff --git a/Interface.py b/Interface.py
new file mode 100644
index 0000000..b4b6ebb
--- /dev/null
+++ b/Interface.py
@@ -0,0 +1,786 @@
+# -*- coding: utf-8 -*-
+import streamlit as st
+import mysql.connector
+import pandas as pd
+import matplotlib.pyplot as plt
+import matplotlib.dates as mdates
+from fpdf import FPDF
+import os
+import random
+from dotenv import load_dotenv
+from datetime import datetime, date, time
+import bcrypt
+
+# Charger les variables d'environnement
+load_dotenv()
+
+st.set_page_config(page_title="Domo91 - Surveillance", layout="wide")
+if "authenticated" not in st.session_state:
+ st.session_state["authenticated"] = False
+ st.session_state["role"] = None
+ st.session_state["lieu_autorise"] = None
+
+st.title("📡 Supervision Températures")
+
+db_config = {
+ "host": os.getenv("DB_HOST"),
+ "user": os.getenv("DB_USER"),
+ "password": os.getenv("DB_PASSWORD"),
+ "database": os.getenv("DB_NAME")
+}
+def get_connection():
+ return mysql.connector.connect(**db_config)
+
+# --- Fonction de génération PDF ---
+def generer_pdf(site, date_str, periode):
+ st.info(f"Génération du rapport PDF pour {site} à la date {date_str} ({periode})")
+ try:
+ conn = mysql.connector.connect(**db_config)
+ pdf_cursor = conn.cursor(dictionary=True)
+
+ # Requête principale
+ pdf_cursor.execute(f"SELECT Sonde, Date, Temperature FROM `{site}` WHERE DATE(Date) = %s ORDER BY Sonde, Date", (date_str,))
+ rows = pdf_cursor.fetchall()
+ df = pd.DataFrame(rows)
+ df["Heure"] = pd.to_datetime(df["Date"]).dt.strftime("%H:%M")
+ df["Heure_obj"] = pd.to_datetime(df["Date"]).dt.time
+
+ # --- Filtrage par période ---
+ plages = {
+ "Toute la journée": (time(0, 0), time(23, 59)),
+ "Matin (6h-12h)": (time(6, 0), time(12, 0)),
+ "Après-midi (12h-18h)": (time(12, 0), time(18, 0)),
+ "Nuit (18h-6h)": (time(18, 0), time(6, 0)),
+ }
+ heure_debut, heure_fin = plages.get(periode, (time(0, 0), time(23, 59)))
+ if heure_debut < heure_fin:
+ df = df[(df["Heure_obj"] >= heure_debut) & (df["Heure_obj"] <= heure_fin)]
+ else:
+ df = df[(df["Heure_obj"] >= heure_debut) | (df["Heure_obj"] <= heure_fin)]
+
+ # --- Structuration des relevés ---
+ releves = {}
+ for sonde in df["Sonde"].unique():
+ df_sonde = df[df["Sonde"] == sonde]
+ releves[sonde] = list(zip(df_sonde["Heure"], df_sonde["Temperature"]))
+
+ # --- Requête alertes ---
+ table_alertes = f"Alertes_{site}"
+ pdf_cursor.execute(f"SELECT Sonde, Debut_defaut, Status FROM {table_alertes} WHERE DATE(Debut_defaut) = %s", (date_str,))
+ alertes = pdf_cursor.fetchall()
+
+ pdf_cursor.close()
+ conn.close()
+
+ # --- Classe PDF ---
+ class RapportPDF(FPDF):
+ def header(self):
+ self.set_font("Arial", "B", 14)
+ self.cell(0, 10, "Rapport de surveillance des sondes", ln=1, align="C")
+ self.set_font("Arial", "", 12)
+ self.cell(0, 10, f"Date : {date_str}", ln=1, align="C")
+ self.cell(0, 10, f"Periode : {getattr(self, 'periode', '')}", ln=1, align="C")
+ self.ln(5)
+
+ def site_info(self, site_name):
+ self.set_font("Arial", "B", 12)
+ self.cell(0, 10, f"Site : {site_name}", ln=1)
+ self.ln(2)
+
+ def releves_section(self, data):
+ self.set_font("Arial", "B", 12)
+ self.cell(0, 10, "Relevés de température", ln=1)
+
+ for sonde, mesures in data.items():
+ self.set_font("Arial", "B", 11)
+ self.cell(0, 8, f"Sonde : {sonde}", ln=1)
+
+ col1 = mesures[::2]
+ col2 = mesures[1::2]
+
+ self.set_font("Arial", "B", 10)
+ self.cell(40, 6, "Heure", border=1)
+ self.cell(30, 6, "Temp (°C)", border=1)
+ self.cell(20, 6, "", border=0)
+ self.cell(40, 6, "Heure", border=1)
+ self.cell(30, 6, "Temp (°C)", border=1)
+ self.ln()
+
+ self.set_font("Arial", "", 10)
+ for i in range(max(len(col1), len(col2))):
+ if i < len(col1):
+ h1, t1 = col1[i]
+ self.cell(40, 6, h1, border=1)
+ self.cell(30, 6, f"{t1:.2f}", border=1)
+ else:
+ self.cell(70, 6, "", border=0)
+ self.cell(20, 6, "", border=0)
+ if i < len(col2):
+ h2, t2 = col2[i]
+ self.cell(40, 6, h2, border=1)
+ self.cell(30, 6, f"{t2:.2f}", border=1)
+ self.ln()
+ self.ln(4)
+
+ def alertes_section(self, data):
+ self.set_font("Arial", "B", 12)
+ self.cell(0, 10, "Alertes enregistrées", ln=1)
+ self.set_font("Arial", "", 10)
+ for a in data:
+ self.cell(0, 6, f"{a['Sonde']} - {a['Debut_defaut']} - {a['Status']}", ln=1)
+
+ # --- Génération du PDF ---
+ pdf = RapportPDF()
+ pdf.periode = periode
+ pdf.add_page()
+ pdf.site_info(site)
+ pdf.releves_section(releves)
+ pdf.alertes_section(alertes)
+
+ file_name = f"rapport_{site}_{date_str}.pdf"
+ output_dir = "PDF"
+ os.makedirs(output_dir, exist_ok=True)
+ output_path = os.path.join(output_dir, file_name)
+
+ pdf.output(output_path)
+
+ with open(output_path, "rb") as f:
+ st.download_button(
+ label="📥 Télécharger le rapport PDF",
+ data=f,
+ file_name=file_name,
+ mime="application/pdf"
+ )
+
+ except Exception as err1:
+ st.error(f"Erreur lors de la génération du PDF : {err1}")
+
+
+# --- Initialisation des variables de session ---
+if "authenticated" not in st.session_state:
+ st.session_state["authenticated"] = False
+if "role" not in st.session_state:
+ st.session_state["role"] = None
+if "lieu_autorise" not in st.session_state:
+ st.session_state["lieu_autorise"] = None
+
+# --- Connexion utilisateur dans la sidebar ---
+ st.sidebar.header("🔐 Connexion")
+if not st.session_state.get("authenticated"):
+ login = st.sidebar.text_input("Nom d'utilisateur")
+ password = st.sidebar.text_input("Mot de passe", type="password")
+ if st.sidebar.button("Se connecter"):
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor(dictionary=True)
+ cursor.execute("SELECT * FROM MotsDePasse WHERE utilisateur = %s", (login,))
+ result = cursor.fetchone()
+ def verifier_password(mot_de_passe_saisi, hash_en_base):
+ return bcrypt.checkpw(mot_de_passe_saisi.encode('utf-8'), hash_en_base.encode('utf-8'))
+ if result and verifier_password(password, result["mot_de_passe"]):
+ if result["Expiration"] and result["Expiration"] < date.today():
+ st.sidebar.error("⛔ Votre accès a expiré. Veuillez contacter un administrateur.")
+ cursor.close()
+ conn.close()
+ st.stop()
+ st.session_state["authenticated"] = True
+ st.session_state["role"] = result["role"]
+ st.session_state["lieu_autorise"] = result["Lieu"]
+ st.success(f"Connecté comme {result['role']} ({result['Lieu']})")
+ # Enregistrement de la connexion dans Connexion_Log
+ try:
+ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ cursor.execute("""
+ INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion)
+ VALUES (%s, %s, %s)
+ """, (login, result["Lieu"], now))
+ conn.commit()
+ except Exception as e:
+ st.warning(f"⚠️ Connexion enregistrée échouée : {e}")
+ st.rerun()
+ else:
+ st.sidebar.error("Identifiants invalides")
+ cursor.close()
+ conn.close()
+ except Exception as e:
+ st.sidebar.error(f"Erreur lors de la connexion à la base : {e}")
+else:
+ st.sidebar.success(f"Connecté ({st.session_state['role']})")
+ if st.sidebar.button("🔓 Déconnexion", key="logout_sidebar"):
+ st.session_state["authenticated"] = False
+ st.session_state["role"] = None
+ st.session_state["lieu_autorise"] = None
+ st.rerun()
+
+def hash_password(password):
+ return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
+
+def ajouter_utilisateur(utilisateur, mot_de_passe, role, lieu, expiration):
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor()
+ cursor.execute("SELECT * FROM MotsDePasse WHERE utilisateur = %s", (utilisateur,))
+ if cursor.fetchone():
+ return False, "❌ Utilisateur déjà existant."
+
+ hash_mdp = hash_password(mot_de_passe)
+ cursor.execute("""
+ INSERT INTO MotsDePasse (utilisateur, mot_de_passe, role, Lieu, Expiration)
+ VALUES (%s, %s, %s, %s, %s)
+ """, (utilisateur, hash_mdp, role, lieu, expiration))
+ conn.commit()
+ cursor.close()
+ conn.close()
+ return True, "✅ Utilisateur ajouté avec succès."
+ except Exception as e:
+ return False, f"⚠️ Erreur : {e}"
+
+def afficher_gestion_expiration(conn):
+ st.subheader("🔐 Gestion des expirations d'accès")
+
+ # Récupérer les utilisateurs avec leurs dates d'expiration
+ cursor = conn.cursor(dictionary=True)
+ cursor.execute("SELECT Id, utilisateur, Expiration FROM MotsDePasse")
+ users = cursor.fetchall()
+ cursor.close()
+
+ df = pd.DataFrame(users)
+ df['Expiration'] = pd.to_datetime(df['Expiration']).dt.date
+
+ for _, row in df.iterrows():
+ est_expire = row['Expiration'] < date.today()
+ fond = "#ffe6e6" if est_expire else "#f8f9fa"
+ with st.container():
+ st.markdown(f"
",
+ unsafe_allow_html=True)
+ col1, col2, col3 = st.columns([4, 2, 1])
+ with col1:
+ st.markdown(f"**Utilisateur :** {row['utilisateur']}")
+ if est_expire:
+ st.markdown("
⛔ Accès expiré",
+ unsafe_allow_html=True)
+
+ with col2:
+ new_date = st.date_input("Expiration", row['Expiration'], key=f"exp_{row['Id']}")
+ with col3:
+ if st.button("✅", key=f"save_{row['Id']}"):
+ try:
+ cursor = conn.cursor()
+ cursor.execute("UPDATE MotsDePasse SET Expiration = %s WHERE Id = %s",
+ (new_date, row['Id']))
+ conn.commit()
+ st.success(f"✅ {row['utilisateur']} mis à jour")
+ cursor.close()
+ except Exception as e:
+ st.error(f"Erreur : {e}")
+
+# 📄 Affichage bouton PDF si une date est choisie
+site_pdf = (
+ st.session_state.get("lieu_autorise")
+ if st.session_state.get("role") != "superviseur"
+
+ else st.session_state.get("selected_site")
+ )
+date_pdf = st.session_state.get("selected_date")
+if site_pdf and date_pdf:
+ st.sidebar.markdown("---")
+ st.sidebar.subheader("📄 Rapport PDF")
+ if st.sidebar.button("📥 Télécharger l’état du jour (PDF)", key="pdf_btn"):
+ periode = st.session_state.get("selected_periode", "Toute la journée")
+ generer_pdf(site_pdf, date, periode)
+
+# --- Forcer une alerte de test dynamique (réservé aux superviseurs)
+if st.session_state.get("authenticated") and st.session_state.get("role") == "superviseur":
+ site_actuel = (
+ st.session_state.get("lieu_autorise") or "Saclay"
+ if st.session_state.get("role") != "superviseur"
+ else st.session_state.get("selected_site")
+ )
+
+ if site_actuel:
+ # Initialiser la détection de changement de site
+ if "last_site" not in st.session_state:
+ st.session_state["last_site"] = None
+
+ # Si le site a changé, recharger les sondes disponibles
+ if site_actuel != st.session_state["last_site"]:
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor()
+ cursor.execute(f"SELECT DISTINCT Sonde FROM `{site_actuel}` ORDER BY Sonde ASC")
+ st.session_state["sondes_dispo"] = [row[0] for row in cursor.fetchall()]
+ cursor.close()
+ conn.close()
+ st.session_state["last_site"] = site_actuel
+ except Exception as e:
+ st.sidebar.warning(f"Erreur chargement des sondes : {e}")
+
+ # Affichage de la liste des sondes disponibles
+ sondes_dispo = st.session_state.get("sondes_dispo", [])
+
+ if sondes_dispo:
+ st.sidebar.markdown("---")
+ st.sidebar.subheader("🧪 Test alerte manuelle")
+ sonde_test = st.sidebar.selectbox("Choisir une sonde :", sondes_dispo)
+
+ if st.sidebar.button("🔔 Forcer une alerte de test"):
+ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor()
+ table_alertes = f"Alertes_{site_actuel}"
+ cursor.execute(f"""
+ INSERT INTO {table_alertes} (Sonde, Debut_defaut, Status)
+ VALUES (%s, %s, %s)
+ """, (sonde_test, now, "Test"))
+ conn.commit()
+ cursor.close()
+ conn.close()
+ st.success(f"Alerte de test créée pour {sonde_test} à {now}")
+ except Exception as e:
+ st.error(f"Erreur lors de la création de l'alerte : {e}")
+
+ else:
+ st.success(f"Connecté ({st.session_state['role']})")
+ if st.button("🔓 Déconnexion", key="logout_main"):
+ st.session_state["authenticated"] = False
+ st.session_state["role"] = None
+ st.session_state["lieu_autorise"] = None
+ st.rerun()
+
+ st.markdown("---")
+ st.subheader("📄 Rapport PDF")
+ if "selected_date" in st.session_state:
+ if st.button("📅 Télécharger l'état du jour (PDF)"):
+ site = st.session_state["lieu_autorise"]
+ date_val = st.session_state["selected_date"].strftime("%Y-%m-%d")
+ generer_pdf(site, date_val, periode)
+ else:
+ st.info("Sélectionnez une date pour activer la génération PDF.")
+# Récupération des sondes actives
+def get_sondes_par_lieu(lieu):
+ conn = get_connection()
+ cursor = conn.cursor(dictionary=True)
+ cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (lieu,))
+ sondes = cursor.fetchall()
+ conn.close()
+ return sondes
+
+# Mise à jour statut entretien
+def maj_entretien(sonde_id, statut, utilisateur):
+ conn = get_connection()
+ cursor = conn.cursor()
+ cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", (statut, sonde_id))
+ # Insertion dans Connexion_log
+ action = "Mise en entretien" if statut else "Sortie d'entretien"
+ cursor.execute(
+ "INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion, Action) VALUES (%s, %s, %s, %s)",
+ (utilisateur, lieu, datetime.now(), action)
+ )
+ conn.commit()
+ conn.close()
+
+# --- CONTENU PRINCIPAL SI AUTHENTIFIÉ ---
+if st.session_state["authenticated"]:
+ # --- AFFICHAGE GLOBAL DES ALERTES NON ACQUITTÉES ---
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor(dictionary=True)
+ if "role" not in st.session_state:
+ st.session_state["role"] = None
+
+ if "lieu_autorise" not in st.session_state:
+ st.session_state["lieu_autorise"] = None
+ site_selectionne = (
+ st.session_state["lieu_autorise"]
+ if st.session_state["role"] != "superviseur"
+ else st.session_state.get("selected_site", "Saclay")
+ )
+
+ table_alertes = f"Alertes_{site_selectionne}"
+ cursor.execute(
+ f"SELECT Sonde, Debut_defaut, Status FROM `{table_alertes}` WHERE Status != 'Acquitté' ORDER BY Debut_defaut DESC"
+ )
+ alertes = cursor.fetchall()
+
+ if alertes:
+ df_alertes = pd.DataFrame(alertes)
+ st.subheader("🚨 Alertes non acquittées")
+ st.dataframe(df_alertes, use_container_width=True)
+ else:
+ st.success("✅ Aucune alerte en cours.")
+
+ cursor.close()
+ conn.close()
+ except Exception as e:
+ st.error(f"Erreur lors de la récupération des alertes : {e}")
+
+ # --- NAVIGATION --- (toujours défini, évite les erreurs)
+ onglet = st.session_state.get("onglet_actif", "Accueil")
+
+ if st.session_state.get("authenticated"):
+ if st.session_state["role"] == "superviseur":
+ onglets_possibles = ["Accueil", "Statistiques", "Entretien", "Traffic", "Utilisateurs"]
+
+ else:
+ onglets_possibles = ["Accueil", "Entretien"]
+
+ onglet = st.sidebar.radio("📁 Navigation", onglets_possibles, index=onglets_possibles.index(onglet))
+ st.session_state["onglet_actif"] = onglet
+
+# --- ONGLET ACCUEIL ---
+ if onglet == "Accueil":
+ st.markdown("## Sélection du site et de la date")
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor(dictionary=True)
+ sites_possibles = ["Saclay", "Meudon"]
+ if st.session_state["role"] == "superviseur":
+ site_selectionne = st.selectbox("📍 Choisissez un site :", sites_possibles)
+ st.session_state["selected_site"] = site_selectionne
+ else:
+ site_selectionne = st.session_state["lieu_autorise"]
+ st.info(f"Site imposé : {site_selectionne}")
+
+ selected_date = st.date_input("📅 Date du relevé", value=date.today())
+ st.session_state["selected_date"] = selected_date
+ site_selectionne = st.session_state.get("lieu_autorise") or st.session_state.get("selected_site")
+
+ if not site_selectionne:
+ st.warning("Aucun site sélectionné.")
+ st.stop()
+ cursor.execute(
+ f"SELECT * FROM `{site_selectionne}` WHERE DATE(Date) = %s ORDER BY Sonde, Date DESC",
+ (selected_date.strftime("%Y-%m-%d"),)
+ )
+ rows = cursor.fetchall()
+ if rows:
+ df = pd.DataFrame(rows)
+ df["Date"] = pd.to_datetime(df["Date"])
+ sondes = sorted(df["Sonde"].unique())
+ sonde_choisie = st.selectbox("🧪 Choisissez une sonde :", sondes, key="selectbox_accueil")
+ df_sonde = df[df["Sonde"] == sonde_choisie]
+
+ df_sonde.loc[:, "Heure"] = df_sonde["Date"].dt.hour
+
+ tranche = st.radio("🕒 Tranche horaire :",
+ ["Toute la journée", "Matin (6h-12h)", "Après-midi (12h-18h)", "Nuit (18h-6h)"])
+ st.session_state["selected_periode"] = tranche
+ if tranche == "Matin (6h-12h)":
+ df_sonde = df_sonde[(df_sonde["Heure"] >= 6) & (df_sonde["Heure"] < 12)]
+ elif tranche == "Après-midi (12h-18h)":
+ df_sonde = df_sonde[(df_sonde["Heure"] >= 12) & (df_sonde["Heure"] < 18)]
+ elif tranche == "Nuit (18h-6h)":
+ df_sonde = df_sonde[(df_sonde["Heure"] >= 18) | (df_sonde["Heure"] < 6)]
+ df_sonde = df_sonde.copy()
+
+ cursor.execute("SELECT Temp_Max FROM Chambres_froides WHERE Lieu = %s AND Sonde = %s",
+ (site_selectionne, sonde_choisie))
+ seuil = cursor.fetchone()
+ seuil_temp = seuil["Temp_Max"] if seuil else 10
+
+ st.subheader("📊 Tableau des relevés")
+ df_filtre = df_sonde.copy()
+ df_filtre = df_filtre.drop(columns="Id", errors="ignore")
+
+
+ def surlignage_temp(val):
+ try:
+ if float(val) > seuil_temp:
+ return "color: red; font-weight: bold"
+ except:
+ pass
+ return ""
+
+
+ styled_df = df_filtre.style.applymap(surlignage_temp, subset=["Temperature"])
+ st.dataframe(styled_df, use_container_width=True)
+
+ st.subheader("📈 Évolution de la température")
+ fig, ax = plt.subplots(figsize=(10, 4))
+ ax.plot(df_filtre["Date"], df_filtre["Temperature"], marker='o', label="Température")
+ ax.axhline(seuil_temp, color='red', linestyle='--', label=f"Seuil {seuil_temp}°C")
+ ax.set_xlabel("Heure")
+ ax.set_ylabel("Température (°C)")
+ ax.set_title(f"{sonde_choisie} - {selected_date.strftime('%d/%m/%Y')}")
+ ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
+ ax.legend()
+ st.pyplot(fig)
+
+ cursor.close()
+ conn.close()
+
+ except Exception as e:
+ st.error(f"Erreur MySQL : {e}")
+
+# ---- ONGLET STATISTIQUES ---
+ elif onglet == "Statistiques":
+ st.markdown("## 📈 Statistiques de température")
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor(dictionary=True)
+
+ site = (
+ st.session_state["lieu_autorise"]
+ if st.session_state["role"] != "superviseur"
+ else st.session_state.get("selected_site", "Saclay")
+ )
+
+ date_val = st.session_state.get("selected_date", date.today())
+
+ cursor.execute(
+ f"SELECT * FROM `{site}` WHERE DATE(Date) = %s ORDER BY Sonde, Date",
+ (date_val.strftime("%Y-%m-%d"),)
+ )
+ rows = cursor.fetchall()
+ df = pd.DataFrame(rows)
+
+ if df.empty:
+ st.info("Aucune donnée pour cette date.")
+ else:
+ df["Date"] = pd.to_datetime(df["Date"])
+ sondes = sorted(df["Sonde"].unique())
+ sonde = st.selectbox("Choisir une sonde :", sondes, key="selectbox_stats")
+ df_sonde = df[df["Sonde"] == sonde]
+
+ st.subheader("Évolution journalière")
+ fig, ax = plt.subplots(figsize=(10, 4))
+ ax.plot(df_sonde["Date"], df_sonde["Temperature"], marker='o')
+ ax.set_title(f"{sonde} - {date_val.strftime('%d/%m/%Y')}")
+ ax.set_xlabel("Heure")
+ ax.set_ylabel("Température (°C)")
+ ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
+ st.pyplot(fig)
+
+ cursor.close()
+ conn.close()
+
+ except Exception as e:
+ st.error(f"Erreur chargement statistiques : {e}")
+
+ # --- Fonctionnalités administrateur : ajout et gestion des chambres froides ---
+ if st.session_state["role"] == "superviseur":
+ with st.expander("+ Ajouter une nouvelle chambre froide", expanded=False):
+ with st.form("ajout_sonde"):
+ nouvelle_sonde = st.text_input("Nom de la nouvelle sonde :")
+ seuil_max = st.number_input("Température maximale autorisée :", min_value=-50.0, max_value=50.0,
+ value=6.0, step=0.1)
+
+ if st.form_submit_button("➕ Ajouter la nouvelle sonde"):
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor()
+
+ # Insérer dans Chambres_froides
+ cursor.execute("""
+ INSERT INTO Chambres_froides (Lieu, Sonde, Temp_Max, Etat)
+ VALUES (%s, %s, %s, %s)
+ """, (site_actuel, nouvelle_sonde, seuil_max, "ON"))
+ conn.commit()
+
+ # Insérer un premier relevé dans la table de relevés
+ table_releves = site_actuel
+ cursor.execute(f"""
+ INSERT INTO `{table_releves}` (Date, Sonde, Temperature)
+ VALUES (NOW(), %s, %s)
+ """, (nouvelle_sonde, 0.0))
+ conn.commit()
+
+ cursor.close()
+ conn.close()
+
+ st.success(
+ f"Sonde {nouvelle_sonde} ajoutée avec Temp_Max {seuil_max}°C et premier relevé enregistré.")
+
+ except Exception as e:
+ st.error(f"Erreur lors de l'ajout de la sonde : {e}")
+ # --- Affichage automatique des alertes non acquittées ---
+ site_selectionne = (
+ st.session_state.get("lieu_autorise")
+ if st.session_state.get("role") != "superviseur"
+ else st.session_state.get("selected_site")
+ )
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor(dictionary=True)
+
+ table_alertes = f"Alertes_{site_selectionne}"
+
+ cursor.close()
+ conn.close()
+ except Exception as e:
+ st.error(f"Erreur lors de la récupération des alertes : {e}")
+
+ if st.session_state["role"] == "superviseur":
+ with st.expander("🛠️ Gestion des chambres froides (administrateur)", expanded=True):
+ if st.button("🔄 Actualiser la liste"):
+ st.session_state["refresh_admin"] = random.randint(0, 9999)
+
+ try:
+ conn_admin = mysql.connector.connect(**db_config)
+ cursor_admin = conn_admin.cursor(dictionary=True)
+ cursor_admin.execute("SELECT * FROM Chambres_froides WHERE Lieu = %s", (site,))
+ chambres = cursor_admin.fetchall()
+
+ if not chambres:
+ st.warning("Aucune chambre froide pour ce site.")
+ else:
+ for chambre in chambres:
+ col1, col2, col3 = st.columns([3, 1, 2])
+ with col1:
+ st.markdown(f"**{chambre['Sonde']}**")
+
+ with col2:
+ etat = st.checkbox("ON", value=(chambre["Etat"] == "ON"),
+ key=f"etat_{chambre['Id']}_{st.session_state.get('refresh_admin', 0)}")
+ new_etat = "ON" if etat else "OFF"
+
+ with col3:
+ temp_max = chambre["Temp_Max"]
+ moins, val, plus = st.columns([1, 2, 1])
+ with moins:
+ if st.button("▼", key=f"moins_{chambre['Id']}"):
+ temp_max -= 1
+ with val:
+ st.markdown(f"
{temp_max}°C
",
+ unsafe_allow_html=True)
+ with plus:
+ if st.button("▲", key=f"plus_{chambre['Id']}"):
+ temp_max += 1
+
+ if new_etat != chambre["Etat"] or temp_max != chambre["Temp_Max"]:
+ cursor_admin.execute(
+ "UPDATE Chambres_froides SET Etat = %s, Temp_Max = %s WHERE Id = %s",
+ (new_etat, temp_max, chambre["Id"])
+ )
+ conn_admin.commit()
+ st.success(f"{chambre['Sonde']} mise à jour")
+
+ cursor_admin.close()
+ conn_admin.close()
+
+ except Exception as e:
+ st.error(f"Erreur SQL (admin) : {e}")
+
+# --- ONGLET ENTRETIEN ---
+ elif onglet == "Entretien":
+ st.markdown("## 🧰 Gestion des sondes en entretien")
+
+
+ st.markdown("
", unsafe_allow_html=True)
+ role = st.session_state.get("role", "utilisateur")
+ lieu = st.session_state.get("lieu_autorise") if role != "superviseur" else st.selectbox("Choisir un lieu :",
+ ["Saclay", "Meudon"])
+ if not lieu:
+ st.warning("Aucun site sélectionné.")
+ st.stop()
+ else:
+ st.info(f"Site imposé : {lieu}" if role != "superviseur" else "")
+
+ def get_sondes_par_lieu(lieu):
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor(dictionary=True)
+ cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (lieu,))
+ sondes = cursor.fetchall()
+ cursor.close()
+ conn.close()
+ return sondes
+
+ def maj_entretien(sonde_id, en_entretien, utilisateur, cible):
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor()
+ cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", (en_entretien, sonde_id))
+ cursor.execute(
+ "INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion, Action, Cible) VALUES (%s, %s, %s, %s, %s)",
+ (utilisateur, lieu, datetime.now(), f"{'Mise' if en_entretien else 'Retrait'} en entretien",
+ cible))
+ conn.commit()
+ cursor.close()
+ conn.close()
+
+
+ try:
+ sondes = get_sondes_par_lieu(lieu)
+ for sonde in sondes:
+ checked = st.checkbox(f"{sonde['Sonde']} (ID: {sonde['Id']})", value=sonde['En_entretien'],
+ key=f"entretien_{sonde['Id']}")
+ if checked != sonde['En_entretien']:
+ maj_entretien(sonde['Id'], checked, st.session_state.get("utilisateur", "Inconnu"), sonde['Sonde'])
+ st.success(f"{sonde['Sonde']} {'mise' if checked else 'retirée'} en entretien.")
+ except Exception as e:
+ st.error(f"Erreur lors du chargement des sondes : {e}")
+
+ # --- ONGLET TRAFFIC ------
+ elif onglet == "Traffic":
+ st.markdown("## 🚦 Connexions récentes")
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor(dictionary=True)
+ cursor.execute(
+ "SELECT Utilisateur, Lieu, Date_Connexion FROM Connexion_Log WHERE Lieu IS NOT NULL ORDER BY Date_Connexion DESC LIMIT 200"
+ )
+ connexions = cursor.fetchall()
+ cursor.close()
+ conn.close()
+
+ if connexions:
+ df_connexions = pd.DataFrame(connexions)
+ st.dataframe(df_connexions, use_container_width=True)
+ else:
+ st.info("Aucune connexion enregistrée.")
+ except Exception as e:
+ st.error(f"Erreur chargement des connexions : {e}")
+
+ # --- ONGLET UTILISATEURS ---
+ elif onglet == "Utilisateurs" and st.session_state.get("role") == "superviseur":
+ st.title("👥 Gestion des utilisateurs")
+
+ # --- Section Ajout ---
+ with st.expander("➕ Ajouter un nouvel utilisateur"):
+ new_user = st.text_input("Nom d'utilisateur")
+ new_pass = st.text_input("Mot de passe", type="password")
+ new_role = st.selectbox("Rôle", ["utilisateur", "superviseur"])
+ new_lieu = None
+ if new_role == "utilisateur":
+ new_lieu = st.selectbox("Lieu autorisé", ["Saclay", "Meudon", "Roissy"])
+ expiration = st.date_input("Date d'expiration (facultative)", value=None)
+
+ if st.button("Créer l'utilisateur"):
+ if new_user and new_pass:
+ success, message = ajouter_utilisateur(
+ utilisateur=new_user,
+ mot_de_passe=new_pass,
+ role=new_role,
+ lieu=new_lieu if new_role == "utilisateur" else None,
+ expiration=expiration if expiration else None
+ )
+ if success:
+ st.success(message)
+ else:
+ st.error(message)
+ else:
+ st.warning("Tous les champs obligatoires doivent être remplis.")
+
+ # --- Section Liste ---
+ st.markdown("### 📋 Utilisateurs existants")
+ try:
+ conn = mysql.connector.connect(**db_config)
+ cursor = conn.cursor(dictionary=True)
+ cursor.execute("SELECT utilisateur, role, Lieu, Expiration FROM MotsDePasse ORDER BY utilisateur")
+ users = cursor.fetchall()
+ cursor.close()
+
+ if users:
+ df_users = pd.DataFrame(users)
+ st.dataframe(df_users)
+ else:
+ st.info("Aucun utilisateur trouvé.")
+
+ # ✅ Appel de la gestion des expirations
+ afficher_gestion_expiration(conn)
+ conn.close()
+
+ except Exception as e:
+ st.error(f"Erreur lors du chargement des utilisateurs : {e}")
+
+