diff --git a/app/creer_user.py b/app/creer_user.py
index 452257f..cd27646 100644
--- a/app/creer_user.py
+++ b/app/creer_user.py
@@ -2,13 +2,22 @@
import os
from datetime import date, datetime
import re
-
import bcrypt
import mysql.connector
from mysql.connector import pooling, errorcode
import pandas as pd
import streamlit as st
from dotenv import load_dotenv
+import smtplib
+from email.message import EmailMessage
+from email.utils import formatdate
+
+load_dotenv()
+SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net")
+SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # OVH: 465 (SSL) ou 587 (STARTTLS)
+SMTP_USER = os.getenv("SMTP_USER")
+SMTP_PASS = os.getenv("SMTP_PASS")
+SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER)
# -----------------------
# Auth minimale
@@ -45,8 +54,29 @@ def require_login():
else:
st.error("Identifiants invalides.")
st.stop()
+def logout_button():
+ st.markdown(
+ """
+
+ """,
+ unsafe_allow_html=True
+ )
+ st.markdown('
', unsafe_allow_html=True)
+ if st.button("🚪 Quitter", key="logout", use_container_width=False):
+ st.session_state.clear()
+ st.success("Déconnexion effectuée.")
+ st.rerun()
+ st.markdown('
', unsafe_allow_html=True)
+
require_login()
+logout_button()
# -----------------------
# Connexion MySQL via pool
@@ -108,13 +138,15 @@ def user_exists(cur, username: str, email: str) -> bool:
(count,) = cur.fetchone()
return count > 0
-def list_users(cnx, limit: int = 500):
- sql = """
- SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
- FROM Utilisateurs
- ORDER BY NomUtilisateur
- LIMIT %s
- """
+def list_users(cnx, limit: int = 500, include_password=False):
+ fields = [
+ "NomUtilisateur", "Nom_complet", "Site", "DateExpiration",
+ "Telephone", "email"
+ ]
+ if include_password:
+ fields.append("MotDePasse")
+
+ sql = f"SELECT {', '.join(fields)} FROM Utilisateurs ORDER BY NomUtilisateur LIMIT %s"
with cnx.cursor(dictionary=True) as cur:
cur.execute(sql, (limit,))
return cur.fetchall()
@@ -195,6 +227,58 @@ def update_password(cnx, username: str, new_password: str):
)
return pwd_hash
+def send_mail(to_email: str, subject: str, body_text: str, body_html: str | None = None):
+ if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM):
+ raise RuntimeError("Configuration SMTP incomplète (SMTP_HOST/PORT/USER/PASS/FROM).")
+ if not to_email:
+ raise ValueError("Destinataire vide.")
+
+ msg = EmailMessage()
+ msg["From"] = SMTP_FROM
+ msg["To"] = to_email
+ msg["Date"] = formatdate(localtime=True)
+ msg["Subject"] = subject
+ msg.set_content(body_text)
+ if body_html:
+ msg.add_alternative(body_html, subtype="html")
+
+ # Choix du protocole en fonction du port
+ if SMTP_PORT == 465:
+ with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=20) as s:
+ s.login(SMTP_USER, SMTP_PASS)
+ s.send_message(msg)
+ else:
+ # 587 (ou autre) : STARTTLS
+ with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as s:
+ s.ehlo()
+ s.starttls() # passe en TLS
+ s.ehlo()
+ s.login(SMTP_USER, SMTP_PASS)
+ s.send_message(msg)
+
+ def get_user_email_and_field(cnx, username: str, field: str):
+ cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"}
+ if field not in cols:
+ field = "Nom_complet" # fallback
+ with cnx.cursor(dictionary=True) as cur:
+ cur.execute(
+ f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s",
+ (username,)
+ )
+ row = cur.fetchone()
+ return (row.get("email") if row else None, row.get("field_value") if row else None)
+
+def get_user_email_and_field(cnx, username: str, field: str):
+ cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"}
+ if field not in cols:
+ field = "Nom_complet" # fallback
+ with cnx.cursor(dictionary=True) as cur:
+ cur.execute(
+ f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s",
+ (username,)
+ )
+ row = cur.fetchone()
+ return (row.get("email") if row else None, row.get("field_value") if row else None)
# -----------------------
# UI
# -----------------------
@@ -203,25 +287,82 @@ st.title("Gestion des utilisateurs")
tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "Créer", "Modifier", "Sécurité"])
-# --- Onglet Liste ---
+# -----------------------
+# ONGLET LISTE
+# -----------------------
with tab_list:
st.subheader("Utilisateurs")
try:
cnx = pool.get_connection()
try:
- rows = list_users(cnx, limit=1000)
+ include_pw = os.getenv("ADMIN_USER") == st.session_state.get("current_user", os.getenv("ADMIN_USER"))
+ rows = list_users(cnx, limit=1000, include_password=include_pw)
finally:
cnx.close()
+
df = pd.DataFrame(rows)
if not df.empty:
+ from datetime import date
df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date
- st.dataframe(df, use_container_width=True, hide_index=True)
- st.caption(f"{len(df)} utilisateur(s)")
+
+ # --- Ajout coloration et statut expiration ---
+ ALERTE_JOURS = 30
+ exp = pd.to_datetime(df["DateExpiration"], errors="coerce")
+ today = pd.Timestamp(date.today())
+
+ df["Jours_restant"] = (exp - today).dt.days
+
+ def statut_expiration(j):
+ if pd.isna(j):
+ return "Inconnu"
+ j = int(j)
+ if j < 0:
+ return "Périmé"
+ if j <= ALERTE_JOURS:
+ return f"Bientôt (≤{ALERTE_JOURS}j)"
+ return "OK"
+
+ df["Statut"] = df["Jours_restant"].apply(statut_expiration)
+
+ c1, c2, c3 = st.columns([1, 1, 3])
+ with c1:
+ only_bad = st.checkbox("📌 Périmés / bientôt", value=False)
+ with c2:
+ tri_jours = st.checkbox("đź”˝ Trier par Jours restants", value=True)
+
+ if only_bad:
+ df = df[df["Statut"].isin(["Périmé", f"Bientôt (≤{ALERTE_JOURS}j)"])]
+
+ if tri_jours and "Jours_restant" in df.columns:
+ df = df.sort_values("Jours_restant", na_position="last")
+
+ def colorize(row):
+ stt = row.get("Statut", "")
+ n = len(row)
+ if stt == "Périmé":
+ return ["background-color:#ffe6e6; color:#b00020;"] * n # rouge clair
+ if stt.startswith("BientĂ´t"):
+ return ["background-color:#fff7e6; color:#8a6d3b;"] * n # orange clair
+ return [""] * n
+
+ styled = df.style.apply(colorize, axis=1)
+
+ st.dataframe(
+ styled,
+ use_container_width=True,
+ hide_index=True,
+ column_config={
+ "DateExpiration": st.column_config.DateColumn("DateExpiration", format="YYYY-MM-DD"),
+ "Jours_restant": st.column_config.NumberColumn("Jours restants", help="Négatif = périmé"),
+ },
+ )
+ st.caption(f"{len(df)} utilisateur(s) affiché(s)")
else:
st.info("Aucun utilisateur Ă afficher.")
except Exception as e:
st.warning(f"Impossible de lister les utilisateurs : {e}")
+
# --- Onglet Créer ---
with tab_create:
with st.form("create_user_form", clear_on_submit=False):
@@ -299,16 +440,57 @@ with tab_edit:
new_value = st.date_input("Nouvelle valeur", value=date.today())
else:
new_value = st.text_input("Nouvelle valeur")
-
+ notify_user = st.checkbox("Notifier l’utilisateur par e-mail", value=True, help="Envoie un e-mail si coché")
update_btn = st.button("Mettre Ă jour", disabled=not sel_user, use_container_width=True)
if update_btn and sel_user:
try:
cnx = pool.get_connection()
try:
+ # 1) Lire email + ancienne valeur
+ to_email, old_value = get_user_email_and_field(cnx, sel_user, field)
+
+ # 2) Appliquer la mise Ă jour
update_field(cnx, sel_user, field, new_value)
finally:
cnx.close()
+
st.success(f"âś… {field} mis Ă jour pour {sel_user}")
+
+ # 3) Notification mail si demandé
+ if notify_user:
+ try:
+ nv = new_value.strftime("%Y-%m-%d") if hasattr(new_value, "strftime") else new_value
+ ov = old_value.strftime("%Y-%m-%d") if hasattr(old_value, "strftime") else old_value
+
+ if not to_email:
+ st.info("ℹ️ Aucune notification envoyée : adresse e-mail manquante.")
+ else:
+ subject = f"[Compte] Mise Ă jour de votre information : {field}"
+ body_txt = (
+ f"Bonjour,\n\n"
+ f"Votre information '{field}' vient d’être mise à jour par l’administrateur.\n"
+ f"Ancienne valeur : {ov}\n"
+ f"Nouvelle valeur : {nv}\n\n"
+ f"Si vous n’êtes pas à l’origine de cette demande, répondez à cet e-mail.\n"
+ f"Cordialement."
+ )
+ body_html = f"""
+
+ Bonjour,
+ Votre information {field} vient d’être mise à jour par l’administrateur.
+
+ - Ancienne valeur : {ov}
+ - Nouvelle valeur : {nv}
+
+ Si vous n’êtes pas à l’origine de cette demande, répondez à cet e-mail.
+ Cordialement.
+
+ """
+ send_mail(to_email, subject, body_txt, body_html)
+ st.success(f"✉️ Notification envoyée à {to_email}")
+ except Exception as e_mail:
+ st.warning(f"Notification non envoyée : {e_mail}")
+
except mysql.connector.Error as db_err:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e: