# -*- coding: utf-8 -*- import streamlit as st import pandas as pd import matplotlib.pyplot as plt import matplotlib.dates as mdates from fpdf import FPDF import os import random from dotenv import load_dotenv from datetime import datetime, date, time import bcrypt from .utils_db import connect_to_mysql # Charger les variables d'environnement load_dotenv() st.set_page_config(page_title="Domo91 - Surveillance", layout="wide") if "authenticated" not in st.session_state: st.session_state["authenticated"] = False st.session_state["role"] = None st.session_state["site_autorise"] = None st.title("📡 Supervision Températures") def get_connection(): return mysql.connector.connect(**db_config) # --- Fonction de génération PDF --- def generer_pdf(site, date_str, periode): st.info(f"Génération du rapport PDF pour {site} à la date {date_str} ({periode})") try: conn = mysql.connector.connect(**db_config) pdf_cursor = conn.cursor(dictionary=True) # Requête principale pdf_cursor.execute(f"SELECT Sonde, Date, Temperature FROM `{site}` WHERE DATE(Date) = %s ORDER BY Sonde, Date", (date_str,)) rows = pdf_cursor.fetchall() df = pd.DataFrame(rows) df["Heure"] = pd.to_datetime(df["Date"]).dt.strftime("%H:%M") df["Heure_obj"] = pd.to_datetime(df["Date"]).dt.time # --- Filtrage par période --- plages = { "Toute la journée": (time(0, 0), time(23, 59)), "Matin (6h-12h)": (time(6, 0), time(12, 0)), "Après-midi (12h-18h)": (time(12, 0), time(18, 0)), "Nuit (18h-6h)": (time(18, 0), time(6, 0)), } heure_debut, heure_fin = plages.get(periode, (time(0, 0), time(23, 59))) if heure_debut < heure_fin: df = df[(df["Heure_obj"] >= heure_debut) & (df["Heure_obj"] <= heure_fin)] else: df = df[(df["Heure_obj"] >= heure_debut) | (df["Heure_obj"] <= heure_fin)] # --- Structuration des relevés --- releves = {} for sonde in df["Sonde"].unique(): df_sonde = df[df["Sonde"] == sonde] releves[sonde] = list(zip(df_sonde["Heure"], df_sonde["Temperature"])) # --- Requête alertes --- table_alertes = f"Alertes_{site}" pdf_cursor.execute(f"SELECT Sonde, Debut_defaut, Status FROM {table_alertes} WHERE DATE(Debut_defaut) = %s", (date_str,)) alertes = pdf_cursor.fetchall() pdf_cursor.close() conn.close() # --- Classe PDF --- class RapportPDF(FPDF): def header(self): self.set_font("Arial", "B", 14) self.cell(0, 10, "Rapport de surveillance des sondes", ln=1, align="C") self.set_font("Arial", "", 12) self.cell(0, 10, f"Date : {date_str}", ln=1, align="C") self.cell(0, 10, f"Periode : {getattr(self, 'periode', '')}", ln=1, align="C") self.ln(5) def site_info(self, site_name): self.set_font("Arial", "B", 12) self.cell(0, 10, f"Site : {site_name}", ln=1) self.ln(2) def releves_section(self, data): self.set_font("Arial", "B", 12) self.cell(0, 10, "Relevés de température", ln=1) for sonde, mesures in data.items(): self.set_font("Arial", "B", 11) self.cell(0, 8, f"Sonde : {sonde}", ln=1) col1 = mesures[::2] col2 = mesures[1::2] self.set_font("Arial", "B", 10) self.cell(40, 6, "Heure", border=1) self.cell(30, 6, "Temp (°C)", border=1) self.cell(20, 6, "", border=0) self.cell(40, 6, "Heure", border=1) self.cell(30, 6, "Temp (°C)", border=1) self.ln() self.set_font("Arial", "", 10) for i in range(max(len(col1), len(col2))): if i < len(col1): h1, t1 = col1[i] self.cell(40, 6, h1, border=1) self.cell(30, 6, f"{t1:.2f}", border=1) else: self.cell(70, 6, "", border=0) self.cell(20, 6, "", border=0) if i < len(col2): h2, t2 = col2[i] self.cell(40, 6, h2, border=1) self.cell(30, 6, f"{t2:.2f}", border=1) self.ln() self.ln(4) def alertes_section(self, data): self.set_font("Arial", "B", 12) self.cell(0, 10, "Alertes enregistrées", ln=1) self.set_font("Arial", "", 10) for a in data: self.cell(0, 6, f"{a['Sonde']} - {a['Debut_defaut']} - {a['Status']}", ln=1) # --- Génération du PDF --- pdf = RapportPDF() pdf.periode = periode pdf.add_page() pdf.site_info(site) pdf.releves_section(releves) pdf.alertes_section(alertes) file_name = f"rapport_{site}_{date_str}.pdf" output_dir = "PDF" os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, file_name) pdf.output(output_path) with open(output_path, "rb") as f: st.download_button( label="📥 Télécharger le rapport PDF", data=f, file_name=file_name, mime="application/pdf" ) except Exception as err1: st.error(f"Erreur lors de la génération du PDF : {err1}") # --- Initialisation des variables de session --- if "authenticated" not in st.session_state: st.session_state["authenticated"] = False if "role" not in st.session_state: st.session_state["role"] = None if "site_autorise" not in st.session_state: st.session_state["site_autorise"] = None # --- Connexion utilisateur dans la sidebar --- st.sidebar.header("🔐 Connexion") if not st.session_state.get("authenticated"): login = st.sidebar.text_input("Nom d'utilisateur") password = st.sidebar.text_input("Mot de passe", type="password") if st.sidebar.button("Se connecter"): try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM Sondes.MotsDePasse WHERE utilisateur = %s", (login,)) result = cursor.fetchone() def verifier_password(mot_de_passe_saisi, hash_en_base): return bcrypt.checkpw(mot_de_passe_saisi.encode('utf-8'), hash_en_base.encode('utf-8')) if result and verifier_password(password, result["mot_de_passe"]): if result["Expiration"] and result["Expiration"] < date.today(): st.sidebar.error("⛔ Votre accès a expiré. Veuillez contacter un administrateur.") cursor.close() conn.close() st.stop() st.session_state["authenticated"] = True st.session_state["role"] = result["role"] st.session_state["site_autorise"] = result["Lieu"] st.success(f"Connecté comme {result['role']} ({result['Lieu']})") # Enregistrement de la connexion dans Connexion_Log try: now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") cursor.execute(""" INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion) VALUES (%s, %s, %s) """, (login, result["Lieu"], now)) conn.commit() except Exception as e: st.warning(f"⚠️ Connexion enregistrée échouée : {e}") st.rerun() else: st.sidebar.error("Identifiants invalides") cursor.close() conn.close() except Exception as e: st.sidebar.error(f"Erreur lors de la connexion à la base : {e}") else: st.sidebar.success(f"Connecté ({st.session_state['role']})") if st.sidebar.button("🔓 Déconnexion", key="logout_sidebar"): st.session_state["authenticated"] = False st.session_state["role"] = None st.session_state["site_autorise"] = None st.rerun() def hash_password(password): return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') def afficher_gestion_expiration(conn): st.subheader("🔐 Gestion des expirations d'accès") # Récupérer les utilisateurs avec leurs dates d'expiration cursor = conn.cursor(dictionary=True) cursor.execute("SELECT Id, utilisateur, Expiration FROM MotsDePasse") users = cursor.fetchall() cursor.close() df = pd.DataFrame(users) df['Expiration'] = pd.to_datetime(df['Expiration']).dt.date for _, row in df.iterrows(): est_expire = row['Expiration'] < date.today() fond = "#ffe6e6" if est_expire else "#f8f9fa" with st.container(): st.markdown(f"
", unsafe_allow_html=True) col1, col2, col3 = st.columns([4, 2, 1]) with col1: st.markdown(f"**Utilisateur :** {row['utilisateur']}") if est_expire: st.markdown("⛔ Accès expiré", unsafe_allow_html=True) with col2: new_date = st.date_input("Expiration", row['Expiration'], key=f"exp_{row['Id']}") with col3: if st.button("✅", key=f"save_{row['Id']}"): try: cursor = conn.cursor() cursor.execute("UPDATE MotsDePasse SET Expiration = %s WHERE Id = %s", (new_date, row['Id'])) conn.commit() st.success(f"✅ {row['utilisateur']} mis à jour") cursor.close() except Exception as e: st.error(f"Erreur : {e}") # 📄 Affichage bouton PDF si une date est choisie site_pdf = ( st.session_state.get("site_autorise") if st.session_state.get("role") != "superviseur" else st.session_state.get("selected_site") ) date_pdf = st.session_state.get("selected_date") if site_pdf and date_pdf: st.sidebar.markdown("---") st.sidebar.subheader("📄 Rapport PDF") if st.sidebar.button("📥 Télécharger l’état du jour (PDF)", key="pdf_btn"): periode = st.session_state.get("selected_periode", "Toute la journée") generer_pdf(site_pdf, date, periode) # --- Forcer une alerte de test dynamique (réservé aux superviseurs) if st.session_state.get("authenticated") and st.session_state.get("role") == "superviseur": site_actuel = ( st.session_state.get("site_autorise") or "Saclay" if st.session_state.get("role") != "superviseur" else st.session_state.get("selected_site") ) if site_actuel: # Initialiser la détection de changement de site if "last_site" not in st.session_state: st.session_state["last_site"] = None # Si le site a changé, recharger les sondes disponibles if site_actuel != st.session_state["last_site"]: try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor() cursor.execute(f"SELECT DISTINCT Sonde FROM `{site_actuel}` ORDER BY Sonde ASC") st.session_state["sondes_dispo"] = [row[0] for row in cursor.fetchall()] cursor.close() conn.close() st.session_state["last_site"] = site_actuel except Exception as e: st.sidebar.warning(f"Erreur chargement des sondes : {e}") # Affichage de la liste des sondes disponibles sondes_dispo = st.session_state.get("sondes_dispo", []) if sondes_dispo: st.sidebar.markdown("---") st.sidebar.subheader("🧪 Test alerte manuelle") sonde_test = st.sidebar.selectbox("Choisir une sonde :", sondes_dispo) if st.sidebar.button("🔔 Forcer une alerte de test"): now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor() table_alertes = f"Alertes_{site_actuel}" cursor.execute(f""" INSERT INTO {table_alertes} (Sonde, Debut_defaut, Status) VALUES (%s, %s, %s) """, (sonde_test, now, "Test")) conn.commit() cursor.close() conn.close() st.success(f"Alerte de test créée pour {sonde_test} à {now}") except Exception as e: st.error(f"Erreur lors de la création de l'alerte : {e}") else: st.success(f"Connecté ({st.session_state['role']})") if st.button("🔓 Déconnexion", key="logout_main"): st.session_state["authenticated"] = False st.session_state["role"] = None st.session_state["site_autorise"] = None st.rerun() st.markdown("---") st.subheader("📄 Rapport PDF") if "selected_date" in st.session_state: if st.button("📅 Télécharger l'état du jour (PDF)"): site = st.session_state["site_autorise"] date_val = st.session_state["selected_date"].strftime("%Y-%m-%d") generer_pdf(site, date_val, periode) else: st.info("Sélectionnez une date pour activer la génération PDF.") # Récupération des sondes actives def get_sondes_par_lieu(lieu): conn = get_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (lieu,)) sondes = cursor.fetchall() conn.close() return sondes # Mise à jour statut entretien def maj_entretien(sonde_id, statut, utilisateur): conn = get_connection() cursor = conn.cursor() cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", (statut, sonde_id)) # Insertion dans Connexion_log action = "Mise en entretien" if statut else "Sortie d'entretien" cursor.execute( "INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion, Action) VALUES (%s, %s, %s, %s)", (utilisateur, lieu, datetime.now(), action) ) conn.commit() conn.close() # --- CONTENU PRINCIPAL SI AUTHENTIFIÉ --- if st.session_state["authenticated"]: # --- AFFICHAGE GLOBAL DES ALERTES NON ACQUITTÉES --- try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) if "role" not in st.session_state: st.session_state["role"] = None if "site_autorise" not in st.session_state: st.session_state["site_autorise"] = None site_selectionne = ( st.session_state["site_autorise"] if st.session_state["role"] != "superviseur" else st.session_state.get("selected_site", "Saclay") ) table_alertes = f"Alertes_{site_selectionne}" cursor.execute( f"SELECT Sonde, Debut_defaut, Status FROM `{table_alertes}` WHERE Status != 'Acquitté' ORDER BY Debut_defaut DESC" ) alertes = cursor.fetchall() if alertes: df_alertes = pd.DataFrame(alertes) st.subheader("🚨 Alertes non acquittées") st.dataframe(df_alertes, use_container_width=True) else: st.success("✅ Aucune alerte en cours.") cursor.close() conn.close() except Exception as e: st.error(f"Erreur lors de la récupération des alertes : {e}") # --- NAVIGATION --- (toujours défini, évite les erreurs) onglet = st.session_state.get("onglet_actif", "Accueil") if st.session_state.get("authenticated"): if st.session_state["role"] == "superviseur": onglets_possibles = ["Accueil", "Statistiques", "Entretien", "Traffic", "Utilisateurs"] else: onglets_possibles = ["Accueil", "Entretien"] onglet = st.sidebar.radio("📁 Navigation", onglets_possibles, index=onglets_possibles.index(onglet)) st.session_state["onglet_actif"] = onglet # --- Bandeau Alertes --- try: conn = get_connection() cursor = conn.cursor(dictionary=True) # Récupérer le site autorisé depuis la session site = st.session_state.get("lieu_autorise") if site: # Lecture des alertes non acquittées pour ce site cursor.execute(f""" SELECT Id, Sonde, Debut_defaut, Etat FROM Alertes_{site} WHERE Etat != 'Acquitté' ORDER BY Debut_defaut DESC """) alertes = cursor.fetchall() if alertes: st.markdown( f"
" f"🚨 {len(alertes)} alerte(s) non résolue(s) sur {site}" f"
", unsafe_allow_html=True ) else: st.markdown( f"
" f"✅ Aucune alerte en cours sur {site}" f"
", unsafe_allow_html=True ) cursor.close() conn.close() except Exception as e: st.error(f"Erreur lors de la récupération des alertes : {e}") # --- ONGLET ACCUEIL --- if onglet == "Accueil": st.markdown("## Sélection du site et de la date") try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) sites_possibles = ["Saclay", "Meudon"] if st.session_state["role"] == "superviseur": site_selectionne = st.selectbox("📍 Choisissez un site :", sites_possibles) st.session_state["selected_site"] = site_selectionne else: site_selectionne = st.session_state["site_autorise"] st.info(f"Site imposé : {site_selectionne}") selected_date = st.date_input("📅 Date du relevé", value=date.today()) st.session_state["selected_date"] = selected_date site_selectionne = st.session_state.get("site_autorise") or st.session_state.get("selected_site") if not site_selectionne: st.warning("Aucun site sélectionné.") st.stop() cursor.execute( f"SELECT * FROM `{site_selectionne}` WHERE DATE(Date) = %s ORDER BY Sonde, Date DESC", (selected_date.strftime("%Y-%m-%d"),) ) rows = cursor.fetchall() if rows: df = pd.DataFrame(rows) df["Date"] = pd.to_datetime(df["Date"]) sondes = sorted(df["Sonde"].unique()) sonde_choisie = st.selectbox("🧪 Choisissez une sonde :", sondes, key="selectbox_accueil") df_sonde = df[df["Sonde"] == sonde_choisie] df_sonde.loc[:, "Heure"] = df_sonde["Date"].dt.hour tranche = st.radio("🕒 Tranche horaire :", ["Toute la journée", "Matin (6h-12h)", "Après-midi (12h-18h)", "Nuit (18h-6h)"]) st.session_state["selected_periode"] = tranche if tranche == "Matin (6h-12h)": df_sonde = df_sonde[(df_sonde["Heure"] >= 6) & (df_sonde["Heure"] < 12)] elif tranche == "Après-midi (12h-18h)": df_sonde = df_sonde[(df_sonde["Heure"] >= 12) & (df_sonde["Heure"] < 18)] elif tranche == "Nuit (18h-6h)": df_sonde = df_sonde[(df_sonde["Heure"] >= 18) | (df_sonde["Heure"] < 6)] df_sonde = df_sonde.copy() cursor.execute("SELECT Temp_Max FROM Chambres_froides WHERE Lieu = %s AND Sonde = %s", (site_selectionne, sonde_choisie)) seuil = cursor.fetchone() seuil_temp = seuil["Temp_Max"] if seuil else 10 st.subheader("📊 Tableau des relevés") df_filtre = df_sonde.copy() df_filtre = df_filtre.drop(columns="Id", errors="ignore") def surlignage_temp(val): try: if float(val) > seuil_temp: return "color: red; font-weight: bold" except: pass return "" styled_df = df_filtre.style.applymap(surlignage_temp, subset=["Temperature"]) st.dataframe(styled_df, use_container_width=True) st.subheader("📈 Évolution de la température") fig, ax = plt.subplots(figsize=(10, 4)) ax.plot(df_filtre["Date"], df_filtre["Temperature"], marker='o', label="Température") ax.axhline(seuil_temp, color='red', linestyle='--', label=f"Seuil {seuil_temp}°C") ax.set_xlabel("Heure") ax.set_ylabel("Température (°C)") ax.set_title(f"{sonde_choisie} - {selected_date.strftime('%d/%m/%Y')}") ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M')) ax.legend() st.pyplot(fig) cursor.close() conn.close() except Exception as e: st.error(f"Erreur MySQL : {e}") # ---- ONGLET STATISTIQUES --- elif onglet == "Statistiques": st.markdown("## 📈 Statistiques de température") try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) site = ( st.session_state["site_autorise"] if st.session_state["role"] != "superviseur" else st.session_state.get("selected_site", "Saclay") ) date_val = st.session_state.get("selected_date", date.today()) cursor.execute( f"SELECT * FROM `{site}` WHERE DATE(Date) = %s ORDER BY Sonde, Date", (date_val.strftime("%Y-%m-%d"),) ) rows = cursor.fetchall() df = pd.DataFrame(rows) if df.empty: st.info("Aucune donnée pour cette date.") else: df["Date"] = pd.to_datetime(df["Date"]) sondes = sorted(df["Sonde"].unique()) sonde = st.selectbox("Choisir une sonde :", sondes, key="selectbox_stats") df_sonde = df[df["Sonde"] == sonde] st.subheader("Évolution journalière") fig, ax = plt.subplots(figsize=(10, 4)) ax.plot(df_sonde["Date"], df_sonde["Temperature"], marker='o') ax.set_title(f"{sonde} - {date_val.strftime('%d/%m/%Y')}") ax.set_xlabel("Heure") ax.set_ylabel("Température (°C)") ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M')) st.pyplot(fig) cursor.close() conn.close() except Exception as e: st.error(f"Erreur chargement statistiques : {e}") # --- Fonctionnalités administrateur : ajout et gestion des chambres froides --- if st.session_state["role"] == "superviseur": with st.expander("+ Ajouter une nouvelle chambre froide", expanded=False): with st.form("ajout_sonde"): nouvelle_sonde = st.text_input("Nom de la nouvelle sonde :") seuil_max = st.number_input("Température maximale autorisée :", min_value=-50.0, max_value=50.0, value=6.0, step=0.1) if st.form_submit_button("➕ Ajouter la nouvelle sonde"): try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor() # Insérer dans Chambres_froides cursor.execute(""" INSERT INTO Chambres_froides (Lieu, Sonde, Temp_Max, Etat) VALUES (%s, %s, %s, %s) """, (site_actuel, nouvelle_sonde, seuil_max, "ON")) conn.commit() # Insérer un premier relevé dans la table de relevés table_releves = site_actuel cursor.execute(f""" INSERT INTO `{table_releves}` (Date, Sonde, Temperature) VALUES (NOW(), %s, %s) """, (nouvelle_sonde, 0.0)) conn.commit() cursor.close() conn.close() st.success( f"Sonde {nouvelle_sonde} ajoutée avec Temp_Max {seuil_max}°C et premier relevé enregistré.") except Exception as e: st.error(f"Erreur lors de l'ajout de la sonde : {e}") # --- Affichage automatique des alertes non acquittées --- site_selectionne = ( st.session_state.get("site_autorise") if st.session_state.get("role") != "superviseur" else st.session_state.get("selected_site") ) try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) table_alertes = f"Alertes_{site_selectionne}" cursor.close() conn.close() except Exception as e: st.error(f"Erreur lors de la récupération des alertes : {e}") if st.session_state["role"] == "superviseur": with st.expander("🛠️ Gestion des chambres froides (administrateur)", expanded=True): if st.button("🔄 Actualiser la liste"): st.session_state["refresh_admin"] = random.randint(0, 9999) try: conn_admin = mysql.connector.connect(**db_config) cursor_admin = conn_admin.cursor(dictionary=True) cursor_admin.execute("SELECT * FROM Chambres_froides WHERE Lieu = %s", (site,)) chambres = cursor_admin.fetchall() if not chambres: st.warning("Aucune chambre froide pour ce site.") else: for chambre in chambres: col1, col2, col3 = st.columns([3, 1, 2]) with col1: st.markdown(f"**{chambre['Sonde']}**") with col2: etat = st.checkbox("ON", value=(chambre["Etat"] == "ON"), key=f"etat_{chambre['Id']}_{st.session_state.get('refresh_admin', 0)}") new_etat = "ON" if etat else "OFF" with col3: temp_max = chambre["Temp_Max"] moins, val, plus = st.columns([1, 2, 1]) with moins: if st.button("▼", key=f"moins_{chambre['Id']}"): temp_max -= 1 with val: st.markdown(f"
{temp_max}°C
", unsafe_allow_html=True) with plus: if st.button("▲", key=f"plus_{chambre['Id']}"): temp_max += 1 if new_etat != chambre["Etat"] or temp_max != chambre["Temp_Max"]: cursor_admin.execute( "UPDATE Chambres_froides SET Etat = %s, Temp_Max = %s WHERE Id = %s", (new_etat, temp_max, chambre["Id"]) ) conn_admin.commit() st.success(f"{chambre['Sonde']} mise à jour") cursor_admin.close() conn_admin.close() except Exception as e: st.error(f"Erreur SQL (admin) : {e}") # --- ONGLET ENTRETIEN --- elif onglet == "Entretien": st.markdown("## 🧰 Gestion des sondes en entretien") st.markdown("

", unsafe_allow_html=True) role = st.session_state.get("role", "utilisateur") lieu = st.session_state.get("site_autorise") if role != "superviseur" else st.selectbox("Choisir un lieu :", ["Saclay", "Meudon"]) if not lieu: st.warning("Aucun site sélectionné.") st.stop() else: st.info(f"Site imposé : {lieu}" if role != "superviseur" else "") def get_sondes_par_lieu(lieu): conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) cursor.execute("SELECT Id, Sonde, En_entretien FROM Chambres_froides WHERE Lieu = %s", (lieu,)) sondes = cursor.fetchall() cursor.close() conn.close() return sondes def maj_entretien(sonde_id, en_entretien, utilisateur, cible): conn = mysql.connector.connect(**db_config) cursor = conn.cursor() cursor.execute("UPDATE Chambres_froides SET En_entretien = %s WHERE Id = %s", (en_entretien, sonde_id)) cursor.execute( "INSERT INTO Connexion_Log (Utilisateur, Lieu, Date_Connexion, Action, Cible) VALUES (%s, %s, %s, %s, %s)", (utilisateur, lieu, datetime.now(), f"{'Mise' if en_entretien else 'Retrait'} en entretien", cible)) conn.commit() cursor.close() conn.close() try: sondes = get_sondes_par_lieu(lieu) for sonde in sondes: checked = st.checkbox(f"{sonde['Sonde']} (ID: {sonde['Id']})", value=sonde['En_entretien'], key=f"entretien_{sonde['Id']}") if checked != sonde['En_entretien']: maj_entretien(sonde['Id'], checked, st.session_state.get("utilisateur", "Inconnu"), sonde['Sonde']) st.success(f"{sonde['Sonde']} {'mise' if checked else 'retirée'} en entretien.") except Exception as e: st.error(f"Erreur lors du chargement des sondes : {e}") # --- ONGLET TRAFFIC ------ elif onglet == "Traffic": st.markdown("## 🚦 Connexions récentes") try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) cursor.execute( "SELECT Utilisateur, Lieu, Date_Connexion FROM Connexion_Log WHERE Lieu IS NOT NULL ORDER BY Date_Connexion DESC LIMIT 200" ) connexions = cursor.fetchall() cursor.close() conn.close() if connexions: df_connexions = pd.DataFrame(connexions) st.dataframe(df_connexions, use_container_width=True) else: st.info("Aucune connexion enregistrée.") except Exception as e: st.error(f"Erreur chargement des connexions : {e}") # --- ONGLET UTILISATEURS --- elif onglet == "Utilisateurs" and st.session_state.get("role") == "superviseur": st.title("👥 Gestion des utilisateurs") # --- Section Ajout --- with st.expander("➕ Ajouter un nouvel utilisateur"): new_user = st.text_input("Nom d'utilisateur") new_pass = st.text_input("Mot de passe", type="password") new_role = st.selectbox("Rôle", ["utilisateur", "superviseur"]) new_lieu = None if new_role == "utilisateur": new_lieu = st.selectbox("Lieu autorisé", ["Saclay", "Meudon", "Roissy"]) expiration = st.date_input("Date d'expiration (facultative)", value=None) if st.button("Créer l'utilisateur"): if new_user and new_pass: success, message = ajouter_utilisateur( utilisateur=new_user, mot_de_passe=new_pass, role=new_role, lieu=new_lieu if new_role == "utilisateur" else None, expiration=expiration if expiration else None ) if success: st.success(message) else: st.error(message) else: st.warning("Tous les champs obligatoires doivent être remplis.") # --- Section Liste --- st.markdown("### 📋 Utilisateurs existants") try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) cursor.execute("SELECT utilisateur, role, Lieu, Expiration FROM MotsDePasse ORDER BY utilisateur") users = cursor.fetchall() cursor.close() if users: df_users = pd.DataFrame(users) st.dataframe(df_users) else: st.info("Aucun utilisateur trouvé.") # ✅ Appel de la gestion des expirations afficher_gestion_expiration(conn) conn.close() except Exception as e: st.error(f"Erreur lors du chargement des utilisateurs : {e}")