diff --git a/Interface.py b/Interface.py new file mode 100644 index 0000000..b4b6ebb --- /dev/null +++ b/Interface.py @@ -0,0 +1,786 @@ +# -*- coding: utf-8 -*- +import streamlit as st +import mysql.connector +import pandas as pd +import matplotlib.pyplot as plt +import matplotlib.dates as mdates +from fpdf import FPDF +import os +import random +from dotenv import load_dotenv +from datetime import datetime, date, time +import bcrypt + +# Charger les variables d'environnement +load_dotenv() + +st.set_page_config(page_title="Domo91 - Surveillance", layout="wide") +if "authenticated" not in st.session_state: + st.session_state["authenticated"] = False + st.session_state["role"] = None + st.session_state["lieu_autorise"] = None + +st.title("📡 Supervision Températures") + +db_config = { + "host": os.getenv("DB_HOST"), + "user": os.getenv("DB_USER"), + "password": os.getenv("DB_PASSWORD"), + "database": os.getenv("DB_NAME") +} +def get_connection(): + return mysql.connector.connect(**db_config) + +# --- Fonction de génération PDF --- +def generer_pdf(site, date_str, periode): + st.info(f"Génération du rapport PDF pour {site} à la date {date_str} ({periode})") + try: + conn = mysql.connector.connect(**db_config) + pdf_cursor = conn.cursor(dictionary=True) + + # Requête principale + pdf_cursor.execute(f"SELECT Sonde, Date, Temperature FROM `{site}` WHERE DATE(Date) = %s ORDER BY Sonde, Date", (date_str,)) + rows = pdf_cursor.fetchall() + df = pd.DataFrame(rows) + df["Heure"] = pd.to_datetime(df["Date"]).dt.strftime("%H:%M") + df["Heure_obj"] = pd.to_datetime(df["Date"]).dt.time + + # --- Filtrage par période --- + plages = { + "Toute la journée": (time(0, 0), time(23, 59)), + "Matin (6h-12h)": (time(6, 0), time(12, 0)), + "Après-midi (12h-18h)": (time(12, 0), time(18, 0)), + "Nuit (18h-6h)": (time(18, 0), time(6, 0)), + } + heure_debut, heure_fin = plages.get(periode, (time(0, 0), time(23, 59))) + if heure_debut < heure_fin: + df = df[(df["Heure_obj"] >= heure_debut) & (df["Heure_obj"] <= heure_fin)] + else: + df = df[(df["Heure_obj"] >= heure_debut) | (df["Heure_obj"] <= heure_fin)] + + # --- Structuration des relevés --- + releves = {} + for sonde in df["Sonde"].unique(): + df_sonde = df[df["Sonde"] == sonde] + releves[sonde] = list(zip(df_sonde["Heure"], df_sonde["Temperature"])) + + # --- Requête alertes --- + table_alertes = f"Alertes_{site}" + pdf_cursor.execute(f"SELECT Sonde, Debut_defaut, Status FROM {table_alertes} WHERE DATE(Debut_defaut) = %s", (date_str,)) + alertes = pdf_cursor.fetchall() + + pdf_cursor.close() + conn.close() + + # --- Classe PDF --- + class RapportPDF(FPDF): + def header(self): + self.set_font("Arial", "B", 14) + self.cell(0, 10, "Rapport de surveillance des sondes", ln=1, align="C") + self.set_font("Arial", "", 12) + self.cell(0, 10, f"Date : {date_str}", ln=1, align="C") + self.cell(0, 10, f"Periode : {getattr(self, 'periode', '')}", ln=1, align="C") + self.ln(5) + + def site_info(self, site_name): + self.set_font("Arial", "B", 12) + self.cell(0, 10, f"Site : {site_name}", ln=1) + self.ln(2) + + def releves_section(self, data): + self.set_font("Arial", "B", 12) + self.cell(0, 10, "Relevés de température", ln=1) + + for sonde, mesures in data.items(): + self.set_font("Arial", "B", 11) + self.cell(0, 8, f"Sonde : {sonde}", ln=1) + + col1 = mesures[::2] + col2 = mesures[1::2] + + self.set_font("Arial", "B", 10) + self.cell(40, 6, "Heure", border=1) + self.cell(30, 6, "Temp (°C)", border=1) + self.cell(20, 6, "", border=0) + self.cell(40, 6, "Heure", border=1) + self.cell(30, 6, "Temp (°C)", border=1) + self.ln() + + self.set_font("Arial", "", 10) + for i in range(max(len(col1), len(col2))): + if i < len(col1): + h1, t1 = col1[i] + self.cell(40, 6, h1, border=1) + self.cell(30, 6, f"{t1:.2f}", border=1) + else: + self.cell(70, 6, "", border=0) + self.cell(20, 6, "", border=0) + if i < len(col2): + h2, t2 = col2[i] + self.cell(40, 6, h2, border=1) + self.cell(30, 6, f"{t2:.2f}", border=1) + self.ln() + self.ln(4) + + def alertes_section(self, data): + self.set_font("Arial", "B", 12) + self.cell(0, 10, "Alertes enregistrées", ln=1) + self.set_font("Arial", "", 10) + for a in data: + self.cell(0, 6, f"{a['Sonde']} - {a['Debut_defaut']} - {a['Status']}", ln=1) + + # --- Génération du PDF --- + pdf = RapportPDF() + pdf.periode = periode + pdf.add_page() + pdf.site_info(site) + pdf.releves_section(releves) + pdf.alertes_section(alertes) + + file_name = f"rapport_{site}_{date_str}.pdf" + output_dir = "PDF" + os.makedirs(output_dir, exist_ok=True) + output_path = os.path.join(output_dir, file_name) + + pdf.output(output_path) + + with open(output_path, "rb") as f: + st.download_button( + label="📥 Télécharger le rapport PDF", + data=f, + file_name=file_name, + mime="application/pdf" + ) + + except Exception as err1: + st.error(f"Erreur lors de la génération du PDF : {err1}") + + +# --- Initialisation des variables de session --- +if "authenticated" not in st.session_state: + st.session_state["authenticated"] = False +if "role" not in st.session_state: + st.session_state["role"] = None +if "lieu_autorise" not in st.session_state: + st.session_state["lieu_autorise"] = None + +# --- Connexion utilisateur dans la sidebar --- + st.sidebar.header("🔐 Connexion") +if not st.session_state.get("authenticated"): + login = st.sidebar.text_input("Nom d'utilisateur") + password = st.sidebar.text_input("Mot de passe", type="password") + if st.sidebar.button("Se connecter"): + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor(dictionary=True) + cursor.execute("SELECT * FROM MotsDePasse WHERE utilisateur = %s", (login,)) + result = cursor.fetchone() + def verifier_password(mot_de_passe_saisi, hash_en_base): + return bcrypt.checkpw(mot_de_passe_saisi.encode('utf-8'), hash_en_base.encode('utf-8')) + if result and verifier_password(password, result["mot_de_passe"]): + if result["Expiration"] and result["Expiration"] < date.today(): + st.sidebar.error("⛔ Votre accès a expiré. Veuillez contacter un administrateur.") + cursor.close() + conn.close() + st.stop() + st.session_state["authenticated"] = True + st.session_state["role"] = result["role"] + st.session_state["lieu_autorise"] = result["Lieu"] + st.success(f"Connecté comme {result['role']} ({result['Lieu']})") + # Enregistrement de la connexion dans Connexion_Log + try: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + cursor.execute(""" + INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion) + VALUES (%s, %s, %s) + """, (login, result["Lieu"], now)) + conn.commit() + except Exception as e: + st.warning(f"⚠️ Connexion enregistrée échouée : {e}") + st.rerun() + else: + st.sidebar.error("Identifiants invalides") + cursor.close() + conn.close() + except Exception as e: + st.sidebar.error(f"Erreur lors de la connexion à la base : {e}") +else: + st.sidebar.success(f"Connecté ({st.session_state['role']})") + if st.sidebar.button("🔓 Déconnexion", key="logout_sidebar"): + st.session_state["authenticated"] = False + st.session_state["role"] = None + st.session_state["lieu_autorise"] = None + st.rerun() + +def hash_password(password): + return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') + +def ajouter_utilisateur(utilisateur, mot_de_passe, role, lieu, expiration): + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor() + cursor.execute("SELECT * FROM MotsDePasse WHERE utilisateur = %s", (utilisateur,)) + if cursor.fetchone(): + return False, "❌ Utilisateur déjà existant." + + hash_mdp = hash_password(mot_de_passe) + cursor.execute(""" + INSERT INTO MotsDePasse (utilisateur, mot_de_passe, role, Lieu, Expiration) + VALUES (%s, %s, %s, %s, %s) + """, (utilisateur, hash_mdp, role, lieu, expiration)) + conn.commit() + cursor.close() + conn.close() + return True, "✅ Utilisateur ajouté avec succès." + except Exception as e: + return False, f"⚠️ Erreur : {e}" + +def afficher_gestion_expiration(conn): + st.subheader("🔐 Gestion des expirations d'accès") + + # Récupérer les utilisateurs avec leurs dates d'expiration + cursor = conn.cursor(dictionary=True) + cursor.execute("SELECT Id, utilisateur, Expiration FROM MotsDePasse") + users = cursor.fetchall() + cursor.close() + + df = pd.DataFrame(users) + df['Expiration'] = pd.to_datetime(df['Expiration']).dt.date + + for _, row in df.iterrows(): + est_expire = row['Expiration'] < date.today() + fond = "#ffe6e6" if est_expire else "#f8f9fa" + with st.container(): + st.markdown(f"
", + unsafe_allow_html=True) + col1, col2, col3 = st.columns([4, 2, 1]) + with col1: + st.markdown(f"**Utilisateur :** {row['utilisateur']}") + if est_expire: + st.markdown("⛔ Accès expiré", + unsafe_allow_html=True) + + with col2: + new_date = st.date_input("Expiration", row['Expiration'], key=f"exp_{row['Id']}") + with col3: + if st.button("✅", key=f"save_{row['Id']}"): + try: + cursor = conn.cursor() + cursor.execute("UPDATE MotsDePasse SET Expiration = %s WHERE Id = %s", + (new_date, row['Id'])) + conn.commit() + st.success(f"✅ {row['utilisateur']} mis à jour") + cursor.close() + except Exception as e: + st.error(f"Erreur : {e}") + +# 📄 Affichage bouton PDF si une date est choisie +site_pdf = ( + st.session_state.get("lieu_autorise") + if st.session_state.get("role") != "superviseur" + + else st.session_state.get("selected_site") + ) +date_pdf = st.session_state.get("selected_date") +if site_pdf and date_pdf: + st.sidebar.markdown("---") + st.sidebar.subheader("📄 Rapport PDF") + if st.sidebar.button("📥 Télécharger l’état du jour (PDF)", key="pdf_btn"): + periode = st.session_state.get("selected_periode", "Toute la journée") + generer_pdf(site_pdf, date, periode) + +# --- Forcer une alerte de test dynamique (réservé aux superviseurs) +if st.session_state.get("authenticated") and st.session_state.get("role") == "superviseur": + site_actuel = ( + st.session_state.get("lieu_autorise") or "Saclay" + if st.session_state.get("role") != "superviseur" + else st.session_state.get("selected_site") + ) + + if site_actuel: + # Initialiser la détection de changement de site + if "last_site" not in st.session_state: + st.session_state["last_site"] = None + + # Si le site a changé, recharger les sondes disponibles + if site_actuel != st.session_state["last_site"]: + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor() + cursor.execute(f"SELECT DISTINCT Sonde FROM `{site_actuel}` ORDER BY Sonde ASC") + st.session_state["sondes_dispo"] = [row[0] for row in cursor.fetchall()] + cursor.close() + conn.close() + st.session_state["last_site"] = site_actuel + except Exception as e: + st.sidebar.warning(f"Erreur chargement des sondes : {e}") + + # Affichage de la liste des sondes disponibles + sondes_dispo = st.session_state.get("sondes_dispo", []) + + if sondes_dispo: + st.sidebar.markdown("---") + st.sidebar.subheader("🧪 Test alerte manuelle") + sonde_test = st.sidebar.selectbox("Choisir une sonde :", sondes_dispo) + + if st.sidebar.button("🔔 Forcer une alerte de test"): + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor() + table_alertes = f"Alertes_{site_actuel}" + cursor.execute(f""" + INSERT INTO {table_alertes} (Sonde, Debut_defaut, Status) + VALUES (%s, %s, %s) + """, (sonde_test, now, "Test")) + conn.commit() + cursor.close() + conn.close() + st.success(f"Alerte de test créée pour {sonde_test} à {now}") + except Exception as e: + st.error(f"Erreur lors de la création de l'alerte : {e}") + + else: + st.success(f"Connecté ({st.session_state['role']})") + if st.button("🔓 Déconnexion", key="logout_main"): + st.session_state["authenticated"] = False + st.session_state["role"] = None + st.session_state["lieu_autorise"] = None + st.rerun() + + st.markdown("---") + st.subheader("📄 Rapport PDF") + if "selected_date" in st.session_state: + if st.button("📅 Télécharger l'état du jour (PDF)"): + site = st.session_state["lieu_autorise"] + date_val = st.session_state["selected_date"].strftime("%Y-%m-%d") + generer_pdf(site, date_val, periode) + else: + st.info("Sélectionnez une date pour activer la génération PDF.") +# Récupération des sondes actives +def get_sondes_par_lieu(lieu): + conn = get_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (lieu,)) + sondes = cursor.fetchall() + conn.close() + return sondes + +# Mise à jour statut entretien +def maj_entretien(sonde_id, statut, utilisateur): + conn = get_connection() + cursor = conn.cursor() + cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", (statut, sonde_id)) + # Insertion dans Connexion_log + action = "Mise en entretien" if statut else "Sortie d'entretien" + cursor.execute( + "INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion, Action) VALUES (%s, %s, %s, %s)", + (utilisateur, lieu, datetime.now(), action) + ) + conn.commit() + conn.close() + +# --- CONTENU PRINCIPAL SI AUTHENTIFIÉ --- +if st.session_state["authenticated"]: + # --- AFFICHAGE GLOBAL DES ALERTES NON ACQUITTÉES --- + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor(dictionary=True) + if "role" not in st.session_state: + st.session_state["role"] = None + + if "lieu_autorise" not in st.session_state: + st.session_state["lieu_autorise"] = None + site_selectionne = ( + st.session_state["lieu_autorise"] + if st.session_state["role"] != "superviseur" + else st.session_state.get("selected_site", "Saclay") + ) + + table_alertes = f"Alertes_{site_selectionne}" + cursor.execute( + f"SELECT Sonde, Debut_defaut, Status FROM `{table_alertes}` WHERE Status != 'Acquitté' ORDER BY Debut_defaut DESC" + ) + alertes = cursor.fetchall() + + if alertes: + df_alertes = pd.DataFrame(alertes) + st.subheader("🚨 Alertes non acquittées") + st.dataframe(df_alertes, use_container_width=True) + else: + st.success("✅ Aucune alerte en cours.") + + cursor.close() + conn.close() + except Exception as e: + st.error(f"Erreur lors de la récupération des alertes : {e}") + + # --- NAVIGATION --- (toujours défini, évite les erreurs) + onglet = st.session_state.get("onglet_actif", "Accueil") + + if st.session_state.get("authenticated"): + if st.session_state["role"] == "superviseur": + onglets_possibles = ["Accueil", "Statistiques", "Entretien", "Traffic", "Utilisateurs"] + + else: + onglets_possibles = ["Accueil", "Entretien"] + + onglet = st.sidebar.radio("📁 Navigation", onglets_possibles, index=onglets_possibles.index(onglet)) + st.session_state["onglet_actif"] = onglet + +# --- ONGLET ACCUEIL --- + if onglet == "Accueil": + st.markdown("## Sélection du site et de la date") + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor(dictionary=True) + sites_possibles = ["Saclay", "Meudon"] + if st.session_state["role"] == "superviseur": + site_selectionne = st.selectbox("📍 Choisissez un site :", sites_possibles) + st.session_state["selected_site"] = site_selectionne + else: + site_selectionne = st.session_state["lieu_autorise"] + st.info(f"Site imposé : {site_selectionne}") + + selected_date = st.date_input("📅 Date du relevé", value=date.today()) + st.session_state["selected_date"] = selected_date + site_selectionne = st.session_state.get("lieu_autorise") or st.session_state.get("selected_site") + + if not site_selectionne: + st.warning("Aucun site sélectionné.") + st.stop() + cursor.execute( + f"SELECT * FROM `{site_selectionne}` WHERE DATE(Date) = %s ORDER BY Sonde, Date DESC", + (selected_date.strftime("%Y-%m-%d"),) + ) + rows = cursor.fetchall() + if rows: + df = pd.DataFrame(rows) + df["Date"] = pd.to_datetime(df["Date"]) + sondes = sorted(df["Sonde"].unique()) + sonde_choisie = st.selectbox("🧪 Choisissez une sonde :", sondes, key="selectbox_accueil") + df_sonde = df[df["Sonde"] == sonde_choisie] + + df_sonde.loc[:, "Heure"] = df_sonde["Date"].dt.hour + + tranche = st.radio("🕒 Tranche horaire :", + ["Toute la journée", "Matin (6h-12h)", "Après-midi (12h-18h)", "Nuit (18h-6h)"]) + st.session_state["selected_periode"] = tranche + if tranche == "Matin (6h-12h)": + df_sonde = df_sonde[(df_sonde["Heure"] >= 6) & (df_sonde["Heure"] < 12)] + elif tranche == "Après-midi (12h-18h)": + df_sonde = df_sonde[(df_sonde["Heure"] >= 12) & (df_sonde["Heure"] < 18)] + elif tranche == "Nuit (18h-6h)": + df_sonde = df_sonde[(df_sonde["Heure"] >= 18) | (df_sonde["Heure"] < 6)] + df_sonde = df_sonde.copy() + + cursor.execute("SELECT Temp_Max FROM Chambres_froides WHERE Lieu = %s AND Sonde = %s", + (site_selectionne, sonde_choisie)) + seuil = cursor.fetchone() + seuil_temp = seuil["Temp_Max"] if seuil else 10 + + st.subheader("📊 Tableau des relevés") + df_filtre = df_sonde.copy() + df_filtre = df_filtre.drop(columns="Id", errors="ignore") + + + def surlignage_temp(val): + try: + if float(val) > seuil_temp: + return "color: red; font-weight: bold" + except: + pass + return "" + + + styled_df = df_filtre.style.applymap(surlignage_temp, subset=["Temperature"]) + st.dataframe(styled_df, use_container_width=True) + + st.subheader("📈 Évolution de la température") + fig, ax = plt.subplots(figsize=(10, 4)) + ax.plot(df_filtre["Date"], df_filtre["Temperature"], marker='o', label="Température") + ax.axhline(seuil_temp, color='red', linestyle='--', label=f"Seuil {seuil_temp}°C") + ax.set_xlabel("Heure") + ax.set_ylabel("Température (°C)") + ax.set_title(f"{sonde_choisie} - {selected_date.strftime('%d/%m/%Y')}") + ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M')) + ax.legend() + st.pyplot(fig) + + cursor.close() + conn.close() + + except Exception as e: + st.error(f"Erreur MySQL : {e}") + +# ---- ONGLET STATISTIQUES --- + elif onglet == "Statistiques": + st.markdown("## 📈 Statistiques de température") + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor(dictionary=True) + + site = ( + st.session_state["lieu_autorise"] + if st.session_state["role"] != "superviseur" + else st.session_state.get("selected_site", "Saclay") + ) + + date_val = st.session_state.get("selected_date", date.today()) + + cursor.execute( + f"SELECT * FROM `{site}` WHERE DATE(Date) = %s ORDER BY Sonde, Date", + (date_val.strftime("%Y-%m-%d"),) + ) + rows = cursor.fetchall() + df = pd.DataFrame(rows) + + if df.empty: + st.info("Aucune donnée pour cette date.") + else: + df["Date"] = pd.to_datetime(df["Date"]) + sondes = sorted(df["Sonde"].unique()) + sonde = st.selectbox("Choisir une sonde :", sondes, key="selectbox_stats") + df_sonde = df[df["Sonde"] == sonde] + + st.subheader("Évolution journalière") + fig, ax = plt.subplots(figsize=(10, 4)) + ax.plot(df_sonde["Date"], df_sonde["Temperature"], marker='o') + ax.set_title(f"{sonde} - {date_val.strftime('%d/%m/%Y')}") + ax.set_xlabel("Heure") + ax.set_ylabel("Température (°C)") + ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M')) + st.pyplot(fig) + + cursor.close() + conn.close() + + except Exception as e: + st.error(f"Erreur chargement statistiques : {e}") + + # --- Fonctionnalités administrateur : ajout et gestion des chambres froides --- + if st.session_state["role"] == "superviseur": + with st.expander("+ Ajouter une nouvelle chambre froide", expanded=False): + with st.form("ajout_sonde"): + nouvelle_sonde = st.text_input("Nom de la nouvelle sonde :") + seuil_max = st.number_input("Température maximale autorisée :", min_value=-50.0, max_value=50.0, + value=6.0, step=0.1) + + if st.form_submit_button("➕ Ajouter la nouvelle sonde"): + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor() + + # Insérer dans Chambres_froides + cursor.execute(""" + INSERT INTO Chambres_froides (Lieu, Sonde, Temp_Max, Etat) + VALUES (%s, %s, %s, %s) + """, (site_actuel, nouvelle_sonde, seuil_max, "ON")) + conn.commit() + + # Insérer un premier relevé dans la table de relevés + table_releves = site_actuel + cursor.execute(f""" + INSERT INTO `{table_releves}` (Date, Sonde, Temperature) + VALUES (NOW(), %s, %s) + """, (nouvelle_sonde, 0.0)) + conn.commit() + + cursor.close() + conn.close() + + st.success( + f"Sonde {nouvelle_sonde} ajoutée avec Temp_Max {seuil_max}°C et premier relevé enregistré.") + + except Exception as e: + st.error(f"Erreur lors de l'ajout de la sonde : {e}") + # --- Affichage automatique des alertes non acquittées --- + site_selectionne = ( + st.session_state.get("lieu_autorise") + if st.session_state.get("role") != "superviseur" + else st.session_state.get("selected_site") + ) + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor(dictionary=True) + + table_alertes = f"Alertes_{site_selectionne}" + + cursor.close() + conn.close() + except Exception as e: + st.error(f"Erreur lors de la récupération des alertes : {e}") + + if st.session_state["role"] == "superviseur": + with st.expander("🛠️ Gestion des chambres froides (administrateur)", expanded=True): + if st.button("🔄 Actualiser la liste"): + st.session_state["refresh_admin"] = random.randint(0, 9999) + + try: + conn_admin = mysql.connector.connect(**db_config) + cursor_admin = conn_admin.cursor(dictionary=True) + cursor_admin.execute("SELECT * FROM Chambres_froides WHERE Lieu = %s", (site,)) + chambres = cursor_admin.fetchall() + + if not chambres: + st.warning("Aucune chambre froide pour ce site.") + else: + for chambre in chambres: + col1, col2, col3 = st.columns([3, 1, 2]) + with col1: + st.markdown(f"**{chambre['Sonde']}**") + + with col2: + etat = st.checkbox("ON", value=(chambre["Etat"] == "ON"), + key=f"etat_{chambre['Id']}_{st.session_state.get('refresh_admin', 0)}") + new_etat = "ON" if etat else "OFF" + + with col3: + temp_max = chambre["Temp_Max"] + moins, val, plus = st.columns([1, 2, 1]) + with moins: + if st.button("▼", key=f"moins_{chambre['Id']}"): + temp_max -= 1 + with val: + st.markdown(f"
{temp_max}°C
", + unsafe_allow_html=True) + with plus: + if st.button("▲", key=f"plus_{chambre['Id']}"): + temp_max += 1 + + if new_etat != chambre["Etat"] or temp_max != chambre["Temp_Max"]: + cursor_admin.execute( + "UPDATE Chambres_froides SET Etat = %s, Temp_Max = %s WHERE Id = %s", + (new_etat, temp_max, chambre["Id"]) + ) + conn_admin.commit() + st.success(f"{chambre['Sonde']} mise à jour") + + cursor_admin.close() + conn_admin.close() + + except Exception as e: + st.error(f"Erreur SQL (admin) : {e}") + +# --- ONGLET ENTRETIEN --- + elif onglet == "Entretien": + st.markdown("## 🧰 Gestion des sondes en entretien") + + + st.markdown("

", unsafe_allow_html=True) + role = st.session_state.get("role", "utilisateur") + lieu = st.session_state.get("lieu_autorise") if role != "superviseur" else st.selectbox("Choisir un lieu :", + ["Saclay", "Meudon"]) + if not lieu: + st.warning("Aucun site sélectionné.") + st.stop() + else: + st.info(f"Site imposé : {lieu}" if role != "superviseur" else "") + + def get_sondes_par_lieu(lieu): + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor(dictionary=True) + cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (lieu,)) + sondes = cursor.fetchall() + cursor.close() + conn.close() + return sondes + + def maj_entretien(sonde_id, en_entretien, utilisateur, cible): + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor() + cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", (en_entretien, sonde_id)) + cursor.execute( + "INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion, Action, Cible) VALUES (%s, %s, %s, %s, %s)", + (utilisateur, lieu, datetime.now(), f"{'Mise' if en_entretien else 'Retrait'} en entretien", + cible)) + conn.commit() + cursor.close() + conn.close() + + + try: + sondes = get_sondes_par_lieu(lieu) + for sonde in sondes: + checked = st.checkbox(f"{sonde['Sonde']} (ID: {sonde['Id']})", value=sonde['En_entretien'], + key=f"entretien_{sonde['Id']}") + if checked != sonde['En_entretien']: + maj_entretien(sonde['Id'], checked, st.session_state.get("utilisateur", "Inconnu"), sonde['Sonde']) + st.success(f"{sonde['Sonde']} {'mise' if checked else 'retirée'} en entretien.") + except Exception as e: + st.error(f"Erreur lors du chargement des sondes : {e}") + + # --- ONGLET TRAFFIC ------ + elif onglet == "Traffic": + st.markdown("## 🚦 Connexions récentes") + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor(dictionary=True) + cursor.execute( + "SELECT Utilisateur, Lieu, Date_Connexion FROM Connexion_Log WHERE Lieu IS NOT NULL ORDER BY Date_Connexion DESC LIMIT 200" + ) + connexions = cursor.fetchall() + cursor.close() + conn.close() + + if connexions: + df_connexions = pd.DataFrame(connexions) + st.dataframe(df_connexions, use_container_width=True) + else: + st.info("Aucune connexion enregistrée.") + except Exception as e: + st.error(f"Erreur chargement des connexions : {e}") + + # --- ONGLET UTILISATEURS --- + elif onglet == "Utilisateurs" and st.session_state.get("role") == "superviseur": + st.title("👥 Gestion des utilisateurs") + + # --- Section Ajout --- + with st.expander("➕ Ajouter un nouvel utilisateur"): + new_user = st.text_input("Nom d'utilisateur") + new_pass = st.text_input("Mot de passe", type="password") + new_role = st.selectbox("Rôle", ["utilisateur", "superviseur"]) + new_lieu = None + if new_role == "utilisateur": + new_lieu = st.selectbox("Lieu autorisé", ["Saclay", "Meudon", "Roissy"]) + expiration = st.date_input("Date d'expiration (facultative)", value=None) + + if st.button("Créer l'utilisateur"): + if new_user and new_pass: + success, message = ajouter_utilisateur( + utilisateur=new_user, + mot_de_passe=new_pass, + role=new_role, + lieu=new_lieu if new_role == "utilisateur" else None, + expiration=expiration if expiration else None + ) + if success: + st.success(message) + else: + st.error(message) + else: + st.warning("Tous les champs obligatoires doivent être remplis.") + + # --- Section Liste --- + st.markdown("### 📋 Utilisateurs existants") + try: + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor(dictionary=True) + cursor.execute("SELECT utilisateur, role, Lieu, Expiration FROM MotsDePasse ORDER BY utilisateur") + users = cursor.fetchall() + cursor.close() + + if users: + df_users = pd.DataFrame(users) + st.dataframe(df_users) + else: + st.info("Aucun utilisateur trouvé.") + + # ✅ Appel de la gestion des expirations + afficher_gestion_expiration(conn) + conn.close() + + except Exception as e: + st.error(f"Erreur lors du chargement des utilisateurs : {e}") + + diff --git a/domo91.py b/domo91.py index 306a6df..c8854bf 100644 --- a/domo91.py +++ b/domo91.py @@ -4,485 +4,150 @@ import mysql.connector import pandas as pd import matplotlib.pyplot as plt import matplotlib.dates as mdates -from fpdf import FPDF import os -import random from dotenv import load_dotenv -from datetime import datetime, date, time +from datetime import datetime, date import bcrypt +import traceback # Charger les variables d'environnement load_dotenv() st.set_page_config(page_title="Domo91 - Surveillance", layout="wide") -if "authenticated" not in st.session_state: - st.session_state["authenticated"] = False - st.session_state["role"] = None - st.session_state["lieu_autorise"] = None + +# Initialisation session state avec valeurs sûres +for key, default in { + "authenticated": False, + "role": None, + "lieu_autorise": None, + "onglet_actif": "Accueil", + "selected_date": date.today(), + "selected_site": "Saclay", + "selected_periode": "Toute la journée", +}.items(): + st.session_state.setdefault(key, default) st.title("📡 Supervision Températures") +# Configuration MySQL db_config = { "host": os.getenv("DB_HOST"), "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD"), "database": os.getenv("DB_NAME") } + + def get_connection(): return mysql.connector.connect(**db_config) -# --- Fonction de génération PDF --- -def generer_pdf(site, date_str, periode): - st.info(f"Génération du rapport PDF pour {site} à la date {date_str} ({periode})") - try: - conn = mysql.connector.connect(**db_config) - pdf_cursor = conn.cursor(dictionary=True) - # Requête principale - pdf_cursor.execute(f"SELECT Sonde, Date, Temperature FROM `{site}` WHERE DATE(Date) = %s ORDER BY Sonde, Date", (date_str,)) - rows = pdf_cursor.fetchall() - df = pd.DataFrame(rows) - df["Heure"] = pd.to_datetime(df["Date"]).dt.strftime("%H:%M") - df["Heure_obj"] = pd.to_datetime(df["Date"]).dt.time - - # --- Filtrage par période --- - plages = { - "Toute la journée": (time(0, 0), time(23, 59)), - "Matin (6h-12h)": (time(6, 0), time(12, 0)), - "Après-midi (12h-18h)": (time(12, 0), time(18, 0)), - "Nuit (18h-6h)": (time(18, 0), time(6, 0)), - } - heure_debut, heure_fin = plages.get(periode, (time(0, 0), time(23, 59))) - if heure_debut < heure_fin: - df = df[(df["Heure_obj"] >= heure_debut) & (df["Heure_obj"] <= heure_fin)] - else: - df = df[(df["Heure_obj"] >= heure_debut) | (df["Heure_obj"] <= heure_fin)] - - # --- Structuration des relevés --- - releves = {} - for sonde in df["Sonde"].unique(): - df_sonde = df[df["Sonde"] == sonde] - releves[sonde] = list(zip(df_sonde["Heure"], df_sonde["Temperature"])) - - # --- Requête alertes --- - table_alertes = f"Alertes_{site}" - pdf_cursor.execute(f"SELECT Sonde, Debut_defaut, Status FROM {table_alertes} WHERE DATE(Debut_defaut) = %s", (date_str,)) - alertes = pdf_cursor.fetchall() - - pdf_cursor.close() - conn.close() - - # --- Classe PDF --- - class RapportPDF(FPDF): - def header(self): - self.set_font("Arial", "B", 14) - self.cell(0, 10, "Rapport de surveillance des sondes", ln=1, align="C") - self.set_font("Arial", "", 12) - self.cell(0, 10, f"Date : {date_str}", ln=1, align="C") - self.cell(0, 10, f"Periode : {getattr(self, 'periode', '')}", ln=1, align="C") - self.ln(5) - - def site_info(self, site_name): - self.set_font("Arial", "B", 12) - self.cell(0, 10, f"Site : {site_name}", ln=1) - self.ln(2) - - def releves_section(self, data): - self.set_font("Arial", "B", 12) - self.cell(0, 10, "Relevés de température", ln=1) - - for sonde, mesures in data.items(): - self.set_font("Arial", "B", 11) - self.cell(0, 8, f"Sonde : {sonde}", ln=1) - - col1 = mesures[::2] - col2 = mesures[1::2] - - self.set_font("Arial", "B", 10) - self.cell(40, 6, "Heure", border=1) - self.cell(30, 6, "Temp (°C)", border=1) - self.cell(20, 6, "", border=0) - self.cell(40, 6, "Heure", border=1) - self.cell(30, 6, "Temp (°C)", border=1) - self.ln() - - self.set_font("Arial", "", 10) - for i in range(max(len(col1), len(col2))): - if i < len(col1): - h1, t1 = col1[i] - self.cell(40, 6, h1, border=1) - self.cell(30, 6, f"{t1:.2f}", border=1) - else: - self.cell(70, 6, "", border=0) - self.cell(20, 6, "", border=0) - if i < len(col2): - h2, t2 = col2[i] - self.cell(40, 6, h2, border=1) - self.cell(30, 6, f"{t2:.2f}", border=1) - self.ln() - self.ln(4) - - def alertes_section(self, data): - self.set_font("Arial", "B", 12) - self.cell(0, 10, "Alertes enregistrées", ln=1) - self.set_font("Arial", "", 10) - for a in data: - self.cell(0, 6, f"{a['Sonde']} - {a['Debut_defaut']} - {a['Status']}", ln=1) - - # --- Génération du PDF --- - pdf = RapportPDF() - pdf.periode = periode - pdf.add_page() - pdf.site_info(site) - pdf.releves_section(releves) - pdf.alertes_section(alertes) - - file_name = f"rapport_{site}_{date_str}.pdf" - output_dir = "PDF" - os.makedirs(output_dir, exist_ok=True) - output_path = os.path.join(output_dir, file_name) - - pdf.output(output_path) - - with open(output_path, "rb") as f: - st.download_button( - label="📥 Télécharger le rapport PDF", - data=f, - file_name=file_name, - mime="application/pdf" - ) - - except Exception as err1: - st.error(f"Erreur lors de la génération du PDF : {err1}") +def hash_password(plain_password): + return bcrypt.hashpw(plain_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') -# --- Initialisation des variables de session --- -if "authenticated" not in st.session_state: - st.session_state["authenticated"] = False -if "role" not in st.session_state: - st.session_state["role"] = None -if "lieu_autorise" not in st.session_state: - st.session_state["lieu_autorise"] = None +def verifier_password(input_password, hash_en_base): + return bcrypt.checkpw(input_password.encode('utf-8'), hash_en_base.encode('utf-8')) -# --- Connexion utilisateur dans la sidebar --- - st.sidebar.header("🔐 Connexion") -if not st.session_state.get("authenticated"): + +# --- Connexion utilisateur --- +if not st.session_state["authenticated"]: login = st.sidebar.text_input("Nom d'utilisateur") password = st.sidebar.text_input("Mot de passe", type="password") + if st.sidebar.button("Se connecter"): try: - conn = mysql.connector.connect(**db_config) + conn = get_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM MotsDePasse WHERE utilisateur = %s", (login,)) result = cursor.fetchone() - def verifier_password(mot_de_passe_saisi, hash_en_base): - return bcrypt.checkpw(mot_de_passe_saisi.encode('utf-8'), hash_en_base.encode('utf-8')) + if result and verifier_password(password, result["mot_de_passe"]): if result["Expiration"] and result["Expiration"] < date.today(): - st.sidebar.error("⛔ Votre accès a expiré. Veuillez contacter un administrateur.") + st.sidebar.error("⛔ Accès expiré.") cursor.close() conn.close() st.stop() - st.session_state["authenticated"] = True - st.session_state["role"] = result["role"] - st.session_state["lieu_autorise"] = result["Lieu"] - st.success(f"Connecté comme {result['role']} ({result['Lieu']})") - # Enregistrement de la connexion dans Connexion_Log - try: - now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - cursor.execute(""" - INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion) - VALUES (%s, %s, %s) - """, (login, result["Lieu"], now)) - conn.commit() - except Exception as e: - st.warning(f"⚠️ Connexion enregistrée échouée : {e}") + st.session_state.update({ + "authenticated": True, + "role": result["role"], + "lieu_autorise": result["Lieu"] + }) + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + cursor.execute("INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion) VALUES (%s, %s, %s)", + (login, result["Lieu"], now_str)) + conn.commit() st.rerun() else: st.sidebar.error("Identifiants invalides") + cursor.close() conn.close() + except Exception as e: - st.sidebar.error(f"Erreur lors de la connexion à la base : {e}") + st.sidebar.error(f"Erreur connexion : {e}") else: st.sidebar.success(f"Connecté ({st.session_state['role']})") - if st.sidebar.button("🔓 Déconnexion", key="logout_sidebar"): - st.session_state["authenticated"] = False - st.session_state["role"] = None - st.session_state["lieu_autorise"] = None + if st.sidebar.button("🔓 Déconnexion"): + for key in ["authenticated", "role", "lieu_autorise"]: + st.session_state[key] = False if key == "authenticated" else None st.rerun() -def hash_password(password): - return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') - -def ajouter_utilisateur(utilisateur, mot_de_passe, role, lieu, expiration): - try: - conn = mysql.connector.connect(**db_config) - cursor = conn.cursor() - cursor.execute("SELECT * FROM MotsDePasse WHERE utilisateur = %s", (utilisateur,)) - if cursor.fetchone(): - return False, "❌ Utilisateur déjà existant." - - hash_mdp = hash_password(mot_de_passe) - cursor.execute(""" - INSERT INTO MotsDePasse (utilisateur, mot_de_passe, role, Lieu, Expiration) - VALUES (%s, %s, %s, %s, %s) - """, (utilisateur, hash_mdp, role, lieu, expiration)) - conn.commit() - cursor.close() - conn.close() - return True, "✅ Utilisateur ajouté avec succès." - except Exception as e: - return False, f"⚠️ Erreur : {e}" - -def afficher_gestion_expiration(conn): - st.subheader("🔐 Gestion des expirations d'accès") - - # Récupérer les utilisateurs avec leurs dates d'expiration - cursor = conn.cursor(dictionary=True) - cursor.execute("SELECT Id, utilisateur, Expiration FROM MotsDePasse") - users = cursor.fetchall() - cursor.close() - - df = pd.DataFrame(users) - df['Expiration'] = pd.to_datetime(df['Expiration']).dt.date - - for _, row in df.iterrows(): - est_expire = row['Expiration'] < date.today() - fond = "#ffe6e6" if est_expire else "#f8f9fa" - with st.container(): - st.markdown(f"
", - unsafe_allow_html=True) - col1, col2, col3 = st.columns([4, 2, 1]) - with col1: - st.markdown(f"**Utilisateur :** {row['utilisateur']}") - if est_expire: - st.markdown("⛔ Accès expiré", - unsafe_allow_html=True) - - with col2: - new_date = st.date_input("Expiration", row['Expiration'], key=f"exp_{row['Id']}") - with col3: - if st.button("✅", key=f"save_{row['Id']}"): - try: - cursor = conn.cursor() - cursor.execute("UPDATE MotsDePasse SET Expiration = %s WHERE Id = %s", - (new_date, row['Id'])) - conn.commit() - st.success(f"✅ {row['utilisateur']} mis à jour") - cursor.close() - except Exception as e: - st.error(f"Erreur : {e}") - -# 📄 Affichage bouton PDF si une date est choisie -site_pdf = ( - st.session_state.get("lieu_autorise") - if st.session_state.get("role") != "superviseur" - - else st.session_state.get("selected_site") - ) -date_pdf = st.session_state.get("selected_date") -if site_pdf and date_pdf: - st.sidebar.markdown("---") - st.sidebar.subheader("📄 Rapport PDF") - if st.sidebar.button("📥 Télécharger l’état du jour (PDF)", key="pdf_btn"): - periode = st.session_state.get("selected_periode", "Toute la journée") - generer_pdf(site_pdf, date, periode) - -# --- Forcer une alerte de test dynamique (réservé aux superviseurs) -if st.session_state.get("authenticated") and st.session_state.get("role") == "superviseur": - site_actuel = ( - st.session_state.get("lieu_autorise") - if st.session_state.get("role") != "superviseur" - else st.session_state.get("selected_site") - ) - - if site_actuel: - # Initialiser la détection de changement de site - if "last_site" not in st.session_state: - st.session_state["last_site"] = None - - # Si le site a changé, recharger les sondes disponibles - if site_actuel != st.session_state["last_site"]: - try: - conn = mysql.connector.connect(**db_config) - cursor = conn.cursor() - cursor.execute(f"SELECT DISTINCT Sonde FROM `{site_actuel}` ORDER BY Sonde ASC") - st.session_state["sondes_dispo"] = [row[0] for row in cursor.fetchall()] - cursor.close() - conn.close() - st.session_state["last_site"] = site_actuel - except Exception as e: - st.sidebar.warning(f"Erreur chargement des sondes : {e}") - - # Affichage de la liste des sondes disponibles - sondes_dispo = st.session_state.get("sondes_dispo", []) - - if sondes_dispo: - st.sidebar.markdown("---") - st.sidebar.subheader("🧪 Test alerte manuelle") - sonde_test = st.sidebar.selectbox("Choisir une sonde :", sondes_dispo) - - if st.sidebar.button("🔔 Forcer une alerte de test"): - now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - try: - conn = mysql.connector.connect(**db_config) - cursor = conn.cursor() - table_alertes = f"Alertes_{site_actuel}" - cursor.execute(f""" - INSERT INTO {table_alertes} (Sonde, Debut_defaut, Status) - VALUES (%s, %s, %s) - """, (sonde_test, now, "Test")) - conn.commit() - cursor.close() - conn.close() - st.success(f"Alerte de test créée pour {sonde_test} à {now}") - except Exception as e: - st.error(f"Erreur lors de la création de l'alerte : {e}") - - else: - st.success(f"Connecté ({st.session_state['role']})") - if st.button("🔓 Déconnexion", key="logout_main"): - st.session_state["authenticated"] = False - st.session_state["role"] = None - st.session_state["lieu_autorise"] = None - st.rerun() - - st.markdown("---") - st.subheader("📄 Rapport PDF") - if "selected_date" in st.session_state: - if st.button("📅 Télécharger l'état du jour (PDF)"): - site = st.session_state["lieu_autorise"] - date_val = st.session_state["selected_date"].strftime("%Y-%m-%d") - generer_pdf(site, date_val, periode) - else: - st.info("Sélectionnez une date pour activer la génération PDF.") -# Récupération des sondes actives -def get_sondes_par_lieu(lieu): - conn = get_connection() - cursor = conn.cursor(dictionary=True) - cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (lieu,)) - sondes = cursor.fetchall() - conn.close() - return sondes - -# Mise à jour statut entretien -def maj_entretien(sonde_id, statut, utilisateur): - conn = get_connection() - cursor = conn.cursor() - cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", (statut, sonde_id)) - # Insertion dans Connexion_log - action = "Mise en entretien" if statut else "Sortie d'entretien" - cursor.execute( - "INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion, Action) VALUES (%s, %s, %s, %s)", - (utilisateur, lieu, datetime.now(), action) - ) - conn.commit() - conn.close() - -# --- CONTENU PRINCIPAL SI AUTHENTIFIÉ --- +# --- Navigation --- if st.session_state["authenticated"]: - # --- AFFICHAGE GLOBAL DES ALERTES NON ACQUITTÉES --- - try: - conn = mysql.connector.connect(**db_config) - cursor = conn.cursor(dictionary=True) - if "role" not in st.session_state: - st.session_state["role"] = None + onglets = ["Accueil", "Entretien"] if st.session_state["role"] != "superviseur" else ["Accueil", "Statistiques", "Entretien", "Traffic", "Utilisateurs"] + onglet_selectionne = st.sidebar.radio("📁 Navigation", onglets, index=onglets.index(st.session_state["onglet_actif"])) + st.session_state["onglet_actif"] = onglet_selectionne - if "lieu_autorise" not in st.session_state: - st.session_state["lieu_autorise"] = None - site_selectionne = ( - st.session_state["lieu_autorise"] - if st.session_state["role"] != "superviseur" - else st.session_state.get("selected_site", "Saclay") - ) + site_actuel = st.session_state.get("lieu_autorise") if st.session_state["role"] != "superviseur" else st.session_state.get("selected_site", "Saclay") + date_selectionnee = st.session_state.get("selected_date", date.today()) + periode_selectionnee = st.session_state.get("selected_periode", "Toute la journée") - table_alertes = f"Alertes_{site_selectionne}" - cursor.execute( - f"SELECT Sonde, Debut_defaut, Status FROM `{table_alertes}` WHERE Status != 'Acquitté' ORDER BY Debut_defaut DESC" - ) - alertes = cursor.fetchall() - - if alertes: - df_alertes = pd.DataFrame(alertes) - st.subheader("🚨 Alertes non acquittées") - st.dataframe(df_alertes, use_container_width=True) - else: - st.success("✅ Aucune alerte en cours.") - - cursor.close() - conn.close() - except Exception as e: - st.error(f"Erreur lors de la récupération des alertes : {e}") - - # --- NAVIGATION --- (toujours défini, évite les erreurs) - onglet = st.session_state.get("onglet_actif", "Accueil") - - if st.session_state.get("authenticated"): - if st.session_state["role"] == "superviseur": - onglets_possibles = ["Accueil", "Statistiques", "Entretien", "Traffic", "Utilisateurs"] - - else: - onglets_possibles = ["Accueil", "Entretien"] - - onglet = st.sidebar.radio("📁 Navigation", onglets_possibles, index=onglets_possibles.index(onglet)) - st.session_state["onglet_actif"] = onglet - -# --- ONGLET ACCUEIL --- - if onglet == "Accueil": - st.markdown("## Sélection du site et de la date") +# --- Onglet Accueil --- + if onglet_selectionne == "Accueil": try: - conn = mysql.connector.connect(**db_config) + conn = get_connection() cursor = conn.cursor(dictionary=True) - sites_possibles = ["Saclay", "Meudon"] + if st.session_state["role"] == "superviseur": - site_selectionne = st.selectbox("📍 Choisissez un site :", sites_possibles) - st.session_state["selected_site"] = site_selectionne + site_actuel = st.selectbox("📍 Choisissez un site :", ["Saclay", "Meudon"], index=0) + st.session_state["selected_site"] = site_actuel else: - site_selectionne = st.session_state["lieu_autorise"] - st.info(f"Site imposé : {site_selectionne}") + st.info(f"Site imposé : {site_actuel}") - selected_date = st.date_input("📅 Date du relevé", value=date.today()) - st.session_state["selected_date"] = selected_date - site_selectionne = st.session_state.get("lieu_autorise") or st.session_state.get("selected_site") + date_selectionnee = st.date_input("📅 Date du relevé", value=date_selectionnee) + st.session_state["selected_date"] = date_selectionnee - if not site_selectionne: - st.warning("Aucun site sélectionné.") - st.stop() - cursor.execute( - f"SELECT * FROM `{site_selectionne}` WHERE DATE(Date) = %s ORDER BY Sonde, Date DESC", - (selected_date.strftime("%Y-%m-%d"),) - ) + cursor.execute(f"SELECT * FROM `{site_actuel}` WHERE DATE(Date) = %s ORDER BY Sonde, Date DESC", + (date_selectionnee.strftime("%Y-%m-%d"),)) rows = cursor.fetchall() + if rows: df = pd.DataFrame(rows) df["Date"] = pd.to_datetime(df["Date"]) sondes = sorted(df["Sonde"].unique()) - sonde_choisie = st.selectbox("🧪 Choisissez une sonde :", sondes, key="selectbox_accueil") - df_sonde = df[df["Sonde"] == sonde_choisie] + sonde_choisie = st.selectbox("🧪 Choisissez une sonde :", sondes) + df_sonde = df[df["Sonde"] == sonde_choisie].copy() + df_sonde["Heure"] = df_sonde["Date"].dt.hour - df_sonde.loc[:, "Heure"] = df_sonde["Date"].dt.hour - - tranche = st.radio("🕒 Tranche horaire :", - ["Toute la journée", "Matin (6h-12h)", "Après-midi (12h-18h)", "Nuit (18h-6h)"]) + tranche = st.radio("🕒 Tranche horaire :", ["Toute la journée", "Matin (6h-12h)", "Après-midi (12h-18h)", "Nuit (18h-6h)"]) st.session_state["selected_periode"] = tranche + if tranche == "Matin (6h-12h)": df_sonde = df_sonde[(df_sonde["Heure"] >= 6) & (df_sonde["Heure"] < 12)] elif tranche == "Après-midi (12h-18h)": df_sonde = df_sonde[(df_sonde["Heure"] >= 12) & (df_sonde["Heure"] < 18)] elif tranche == "Nuit (18h-6h)": df_sonde = df_sonde[(df_sonde["Heure"] >= 18) | (df_sonde["Heure"] < 6)] - df_sonde = df_sonde.copy() + seuil_temp = 10 cursor.execute("SELECT Temp_Max FROM Chambres_froides WHERE Lieu = %s AND Sonde = %s", - (site_selectionne, sonde_choisie)) + (site_actuel, sonde_choisie)) seuil = cursor.fetchone() - seuil_temp = seuil["Temp_Max"] if seuil else 10 + if seuil: + seuil_temp = seuil["Temp_Max"] st.subheader("📊 Tableau des relevés") - df_filtre = df_sonde.copy() - df_filtre = df_filtre.drop(columns="Id", errors="ignore") - def surlignage_temp(val): try: @@ -492,17 +157,16 @@ if st.session_state["authenticated"]: pass return "" - - styled_df = df_filtre.style.applymap(surlignage_temp, subset=["Temperature"]) + styled_df = df_sonde.style.applymap(surlignage_temp, subset=["Temperature"]) st.dataframe(styled_df, use_container_width=True) st.subheader("📈 Évolution de la température") fig, ax = plt.subplots(figsize=(10, 4)) - ax.plot(df_filtre["Date"], df_filtre["Temperature"], marker='o', label="Température") + ax.plot(df_sonde["Date"], df_sonde["Temperature"], marker='o') ax.axhline(seuil_temp, color='red', linestyle='--', label=f"Seuil {seuil_temp}°C") ax.set_xlabel("Heure") ax.set_ylabel("Température (°C)") - ax.set_title(f"{sonde_choisie} - {selected_date.strftime('%d/%m/%Y')}") + ax.set_title(f"{sonde_choisie} - {date_selectionnee.strftime('%d/%m/%Y')}") ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M')) ax.legend() st.pyplot(fig) @@ -510,11 +174,13 @@ if st.session_state["authenticated"]: cursor.close() conn.close() - except Exception as e: - st.error(f"Erreur MySQL : {e}") -# ---- ONGLET STATISTIQUES --- - elif onglet == "Statistiques": + except Exception as e: + st.error(f"Erreur : {e}") + st.text(traceback.format_exc()) + +# --- Onglet Statistiques --- + elif onglet_selectionne == "Statistiques": st.markdown("## 📈 Statistiques de température") try: conn = mysql.connector.connect(**db_config) @@ -558,229 +224,66 @@ if st.session_state["authenticated"]: except Exception as e: st.error(f"Erreur chargement statistiques : {e}") - # --- Fonctionnalités administrateur : ajout et gestion des chambres froides --- - if st.session_state["role"] == "superviseur": - with st.expander("+ Ajouter une nouvelle chambre froide", expanded=False): - with st.form("ajout_sonde"): - nouvelle_sonde = st.text_input("Nom de la nouvelle sonde :") - seuil_max = st.number_input("Température maximale autorisée :", min_value=-50.0, max_value=50.0, - value=6.0, step=0.1) - - if st.form_submit_button("➕ Ajouter la nouvelle sonde"): - try: - conn = mysql.connector.connect(**db_config) - cursor = conn.cursor() - - # Insérer dans Chambres_froides - cursor.execute(""" - INSERT INTO Chambres_froides (Lieu, Sonde, Temp_Max, Etat) - VALUES (%s, %s, %s, %s) - """, (site_actuel, nouvelle_sonde, seuil_max, "ON")) - conn.commit() - - # Insérer un premier relevé dans la table de relevés - table_releves = site_actuel - cursor.execute(f""" - INSERT INTO `{table_releves}` (Date, Sonde, Temperature) - VALUES (NOW(), %s, %s) - """, (nouvelle_sonde, 0.0)) - conn.commit() - - cursor.close() - conn.close() - - st.success( - f"Sonde {nouvelle_sonde} ajoutée avec Temp_Max {seuil_max}°C et premier relevé enregistré.") - - except Exception as e: - st.error(f"Erreur lors de l'ajout de la sonde : {e}") - # --- Affichage automatique des alertes non acquittées --- - site_selectionne = ( - st.session_state.get("lieu_autorise") - if st.session_state.get("role") != "superviseur" - else st.session_state.get("selected_site") - ) +# --- Onglet Entretien --- + elif onglet_selectionne == "Entretien": + st.header("🧰 Gestion Entretien") try: - conn = mysql.connector.connect(**db_config) + conn = get_connection() cursor = conn.cursor(dictionary=True) - - table_alertes = f"Alertes_{site_selectionne}" - - cursor.close() - conn.close() - except Exception as e: - st.error(f"Erreur lors de la récupération des alertes : {e}") - - if st.session_state["role"] == "superviseur": - with st.expander("🛠️ Gestion des chambres froides (administrateur)", expanded=True): - if st.button("🔄 Actualiser la liste"): - st.session_state["refresh_admin"] = random.randint(0, 9999) - - try: - conn_admin = mysql.connector.connect(**db_config) - cursor_admin = conn_admin.cursor(dictionary=True) - cursor_admin.execute("SELECT * FROM Chambres_froides WHERE Lieu = %s", (site,)) - chambres = cursor_admin.fetchall() - - if not chambres: - st.warning("Aucune chambre froide pour ce site.") - else: - for chambre in chambres: - col1, col2, col3 = st.columns([3, 1, 2]) - with col1: - st.markdown(f"**{chambre['Sonde']}**") - - with col2: - etat = st.checkbox("ON", value=(chambre["Etat"] == "ON"), - key=f"etat_{chambre['Id']}_{st.session_state.get('refresh_admin', 0)}") - new_etat = "ON" if etat else "OFF" - - with col3: - temp_max = chambre["Temp_Max"] - moins, val, plus = st.columns([1, 2, 1]) - with moins: - if st.button("▼", key=f"moins_{chambre['Id']}"): - temp_max -= 1 - with val: - st.markdown(f"
{temp_max}°C
", - unsafe_allow_html=True) - with plus: - if st.button("▲", key=f"plus_{chambre['Id']}"): - temp_max += 1 - - if new_etat != chambre["Etat"] or temp_max != chambre["Temp_Max"]: - cursor_admin.execute( - "UPDATE Chambres_froides SET Etat = %s, Temp_Max = %s WHERE Id = %s", - (new_etat, temp_max, chambre["Id"]) - ) - conn_admin.commit() - st.success(f"{chambre['Sonde']} mise à jour") - - cursor_admin.close() - conn_admin.close() - - except Exception as e: - st.error(f"Erreur SQL (admin) : {e}") - -# --- ONGLET ENTRETIEN --- - elif onglet == "Entretien": - st.markdown("## 🧰 Gestion des sondes en entretien") - - - st.markdown("

", unsafe_allow_html=True) - role = st.session_state.get("role", "utilisateur") - lieu = st.session_state.get("lieu_autorise") if role != "superviseur" else st.selectbox("Choisir un lieu :", - ["Saclay", "Meudon"]) - if not lieu: - st.warning("Aucun site sélectionné.") - st.stop() - else: - st.info(f"Site imposé : {lieu}" if role != "superviseur" else "") - - def get_sondes_par_lieu(lieu): - conn = mysql.connector.connect(**db_config) - cursor = conn.cursor(dictionary=True) - cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (lieu,)) + cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (site_actuel,)) sondes = cursor.fetchall() - cursor.close() - conn.close() - return sondes - def maj_entretien(sonde_id, en_entretien, utilisateur, cible): - conn = mysql.connector.connect(**db_config) - cursor = conn.cursor() - cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", (en_entretien, sonde_id)) - cursor.execute( - "INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion, Action, Cible) VALUES (%s, %s, %s, %s, %s)", - (utilisateur, lieu, datetime.now(), f"{'Mise' if en_entretien else 'Retrait'} en entretien", - cible)) - conn.commit() - cursor.close() - conn.close() - - - try: - sondes = get_sondes_par_lieu(lieu) for sonde in sondes: - checked = st.checkbox(f"{sonde['Sonde']} (ID: {sonde['Id']})", value=sonde['En_entretien'], - key=f"entretien_{sonde['Id']}") + checked = st.checkbox(f"{sonde['Sonde']}", value=sonde['En_entretien']) if checked != sonde['En_entretien']: - maj_entretien(sonde['Id'], checked, st.session_state.get("utilisateur", "Inconnu"), sonde['Sonde']) + cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", + (checked, sonde['Id'])) + conn.commit() st.success(f"{sonde['Sonde']} {'mise' if checked else 'retirée'} en entretien.") - except Exception as e: - st.error(f"Erreur lors du chargement des sondes : {e}") - # --- ONGLET TRAFFIC ------ - elif onglet == "Traffic": - st.markdown("## 🚦 Connexions récentes") - try: - conn = mysql.connector.connect(**db_config) - cursor = conn.cursor(dictionary=True) - cursor.execute( - "SELECT Utilisateur, Lieu, Date_Connexion FROM Connexion_Log WHERE Lieu IS NOT NULL ORDER BY Date_Connexion DESC LIMIT 200" - ) - connexions = cursor.fetchall() cursor.close() conn.close() - if connexions: - df_connexions = pd.DataFrame(connexions) - st.dataframe(df_connexions, use_container_width=True) - else: - st.info("Aucune connexion enregistrée.") + except Exception as e: - st.error(f"Erreur chargement des connexions : {e}") + st.error(f"Erreur : {e}") + st.text(traceback.format_exc()) - # --- ONGLET UTILISATEURS --- - elif onglet == "Utilisateurs" and st.session_state.get("role") == "superviseur": - st.title("👥 Gestion des utilisateurs") +# --- Onglet Traffic --- + elif onglet_selectionne == "Traffic": + st.header("🚦 Connexions récentes") + try: + conn = get_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute("SELECT Utilisateur, Lieu, Date_Connexion FROM Connexion_Log ORDER BY Date_Connexion DESC LIMIT 100") + logs = cursor.fetchall() + df_logs = pd.DataFrame(logs) + st.dataframe(df_logs) + cursor.close() + conn.close() + except Exception as e: + st.error(f"Erreur : {e}") + st.text(traceback.format_exc()) - # --- Section Ajout --- - with st.expander("➕ Ajouter un nouvel utilisateur"): + # --- Onglet Utilisateurs --- + elif onglet_selectionne == "Utilisateurs": + st.header("👥 Gestion des utilisateurs") + with st.form("ajouter_utilisateur"): new_user = st.text_input("Nom d'utilisateur") new_pass = st.text_input("Mot de passe", type="password") new_role = st.selectbox("Rôle", ["utilisateur", "superviseur"]) - new_lieu = None - if new_role == "utilisateur": - new_lieu = st.selectbox("Lieu autorisé", ["Saclay", "Meudon", "Roissy"]) - expiration = st.date_input("Date d'expiration (facultative)", value=None) - - if st.button("Créer l'utilisateur"): - if new_user and new_pass: - success, message = ajouter_utilisateur( - utilisateur=new_user, - mot_de_passe=new_pass, - role=new_role, - lieu=new_lieu if new_role == "utilisateur" else None, - expiration=expiration if expiration else None - ) - if success: - st.success(message) - else: - st.error(message) - else: - st.warning("Tous les champs obligatoires doivent être remplis.") - - # --- Section Liste --- - st.markdown("### 📋 Utilisateurs existants") - try: - conn = mysql.connector.connect(**db_config) - cursor = conn.cursor(dictionary=True) - cursor.execute("SELECT utilisateur, role, Lieu, Expiration FROM MotsDePasse ORDER BY utilisateur") - users = cursor.fetchall() - cursor.close() - - if users: - df_users = pd.DataFrame(users) - st.dataframe(df_users) - else: - st.info("Aucun utilisateur trouvé.") - - # ✅ Appel de la gestion des expirations - afficher_gestion_expiration(conn) - conn.close() - - except Exception as e: - st.error(f"Erreur lors du chargement des utilisateurs : {e}") - - + new_lieu = st.selectbox("Lieu", ["Saclay", "Meudon", "Roissy"]) + if st.form_submit_button("Ajouter"): + try: + conn = get_connection() + cursor = conn.cursor() + hash_mdp = hash_password(new_pass) + cursor.execute("INSERT INTO MotsDePasse (utilisateur, mot_de_passe, role, Lieu) VALUES (%s, %s, %s, %s)", + (new_user, hash_mdp, new_role, new_lieu)) + conn.commit() + cursor.close() + conn.close() + st.success("Utilisateur ajouté.") + except Exception as e: + st.error(f"Erreur : {e}") + st.text(traceback.format_exc())