# Streamlit app
import os
from datetime import date, datetime
import re
import bcrypt
import mysql.connector
from mysql.connector import pooling, errorcode
import pandas as pd
import streamlit as st
from dotenv import load_dotenv
import smtplib
from email.message import EmailMessage
from email.utils import formatdate
load_dotenv()
SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net")
SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # OVH: 465 (SSL) ou 587 (STARTTLS)
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASS = os.getenv("SMTP_PASS")
SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER)
# -----------------------
# Auth minimale
# -----------------------
def require_login():
st.markdown(
"",
unsafe_allow_html=True,
)
load_dotenv()
admin_user = os.getenv("ADMIN_USER")
admin_hash = os.getenv("ADMIN_PASS_HASH")
if not admin_user or not admin_hash:
st.error("Variables ADMIN_USER et/ou ADMIN_PASS_HASH manquantes dans .env")
st.stop()
if "auth_ok" not in st.session_state:
st.session_state.auth_ok = False
if not st.session_state.auth_ok:
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
st.header("đ AccĂšs restreint")
u = st.text_input("Utilisateur")
p = st.text_input("Mot de passe", type="password")
if st.button("Se connecter", use_container_width=True):
try:
ok = (u == admin_user) and bcrypt.checkpw(p.encode(), admin_hash.encode())
except Exception:
ok = False
if ok:
st.session_state.auth_ok = True
st.rerun()
else:
st.error("Identifiants invalides.")
st.stop()
def logout_button():
st.markdown(
"""
""",
unsafe_allow_html=True
)
st.markdown('
', unsafe_allow_html=True)
if st.button("đȘ Quitter", key="logout", use_container_width=False):
st.session_state.clear()
st.success("Déconnexion effectuée.")
st.rerun()
st.markdown('
', unsafe_allow_html=True)
require_login()
logout_button()
# -----------------------
# Connexion MySQL via pool
# -----------------------
@st.cache_resource
def get_pool():
load_dotenv()
host = os.getenv("DB_HOST")
port = int(os.getenv("MYSQL_PORT", "3306")) # â
valeur par défaut 3306
user = os.getenv("DB_USER2")
pwd = os.getenv("DB_PASSWORD2")
db = os.getenv("DB_NAME2")
# â
contrĂŽle des variables indispensables
missing = [k for k, v in {
"DB_HOST": host,
"DB_USER2": user,
"DB_PASSWORD2": pwd,
"DB_NAME2": db,
}.items() if v in (None, "")]
if missing:
raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}")
return pooling.MySQLConnectionPool(
pool_name="users_pool",
pool_size=5,
pool_reset_session=True,
host=host,
port=port,
user=user,
password=pwd,
database=db,
autocommit=True,
)
# -----------------------
# Helpers SQL + validations
# -----------------------
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
PHONE_RE = re.compile(r"\d{10,14}")
def normalize_phone(p: str|None) -> str|None:
if not p:
return None
digits = re.sub(r"\D", "", p)
return digits if PHONE_RE.match(digits) else None
def to_sql_date(d: date | str | None) -> str | None:
if d is None:
return None
if isinstance(d, str):
try:
d = datetime.fromisoformat(d).date()
except Exception:
return None
return d.strftime("%Y-%m-%d")
def hash_password(plain: str, rounds: int = 12) -> str:
salt = bcrypt.gensalt(rounds=rounds)
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
def user_exists(cur, username: str) -> bool:
cur.execute("SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s", (username,))
(count,) = cur.fetchone()
return count > 0
def find_users_by_email(cnx, email: str):
cur = cnx.cursor(dictionary=True)
try:
cur.execute(
"SELECT NomUtilisateur, Site FROM Utilisateurs WHERE email=%s ORDER BY NomUtilisateur",
(email,),
)
return cur.fetchall()
finally:
cur.close()
def list_users(cnx, limit: int = 500, include_password=False):
fields = ["NomUtilisateur", "Nom_complet", "Site", "DateExpiration", "Telephone", "email"]
if include_password:
fields.append("MotDePasse")
sql = f"SELECT {', '.join(fields)} FROM Utilisateurs ORDER BY NomUtilisateur LIMIT %s"
cur = cnx.cursor(dictionary=True)
try:
cur.execute(sql, (limit,))
return cur.fetchall()
finally:
cur.close()
def insert_user(cnx, username, full_name, site, password, expires, phone, email, role):
if not EMAIL_RE.match(email):
raise ValueError("Email invalide.")
phone_norm = normalize_phone(phone)
exp_sql = to_sql_date(expires)
pwd_hash = hash_password(password)
cur = cnx.cursor()
try:
if user_exists(cur, username):
raise RuntimeError("Nom d'utilisateur déjà existant.")
cur.execute(
"""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email, role)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
""",
(username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email, role),
)
return pwd_hash
finally:
cur.close()
def get_user_details(cnx, username: str):
cur = cnx.cursor(dictionary=True)
try:
cur.execute(
"""
SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
FROM Utilisateurs
WHERE NomUtilisateur=%s
""",
(username,),
)
return cur.fetchone()
finally:
cur.close()
def update_field(cnx, username: str, field: str, value):
allowed = {
"Nom_complet": "Nom_complet",
"Site": "Site",
"DateExpiration": "DateExpiration",
"Telephone": "Telephone",
"email": "email",
}
if field not in allowed:
raise ValueError("Champ non autorisé.")
sql_field = allowed[field]
if field == "email":
if not EMAIL_RE.match(str(value)):
raise ValueError("Email invalide.")
if field == "Telephone":
value = normalize_phone(str(value)) if value else None
if field == "DateExpiration":
value = to_sql_date(value)
cur = cnx.cursor()
try:
cur.execute(
f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s",
(value, username),
)
finally:
cur.close()
def update_password(cnx, username: str, new_password: str):
pwd_hash = hash_password(new_password)
cur = cnx.cursor()
try:
cur.execute(
"UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s",
(pwd_hash, username),
)
return pwd_hash
finally:
cur.close()
def send_mail(to_email: str, subject: str, body_text: str, body_html: str | None = None):
if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM):
raise RuntimeError("Configuration SMTP incomplĂšte (SMTP_HOST/PORT/USER/PASS/FROM).")
if not to_email:
raise ValueError("Destinataire vide.")
msg = EmailMessage()
msg["From"] = SMTP_FROM
msg["To"] = to_email
msg["Date"] = formatdate(localtime=True)
msg["Subject"] = subject
msg.set_content(body_text)
if body_html:
msg.add_alternative(body_html, subtype="html")
# Choix du protocole en fonction du port
if SMTP_PORT == 465:
with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=20) as s:
s.login(SMTP_USER, SMTP_PASS)
s.send_message(msg)
else:
# 587 (ou autre) : STARTTLS
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as s:
s.ehlo()
s.starttls() # passe en TLS
s.ehlo()
s.login(SMTP_USER, SMTP_PASS)
s.send_message(msg)
def get_user_email_and_field(cnx, username: str, field: str):
cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"}
if field not in cols:
field = "Nom_complet" # fallback
with cnx.cursor(dictionary=True) as cur:
cur.execute(
f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s",
(username,)
)
row = cur.fetchone()
return (row.get("email") if row else None, row.get("field_value") if row else None)
def get_user_email_and_field(cnx, username: str, field: str):
cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"}
if field not in cols:
field = "Nom_complet" # fallback
cur = cnx.cursor(dictionary=True)
try:
cur.execute(
f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s",
(username,),
)
row = cur.fetchone()
return (row.get("email") if row else None,
row.get("field_value") if row else None)
finally:
cur.close()
# -----------------------
# UI
# -----------------------
st.set_page_config(page_title="Acces.Utilisateurs", page_icon="đ€", layout="wide")
st.title("Gestion des utilisateurs")
try:
pool = get_pool()
except Exception as e:
st.error(f"â Impossible d'initialiser le pool MySQL : {e}")
st.stop()
tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "Créer", "Modifier", "Sécurité"])
# -----------------------
# ONGLET LISTE
# -----------------------
with tab_list:
st.subheader("Utilisateurs")
try:
cnx = pool.get_connection()
try:
include_pw = os.getenv("ADMIN_USER") == st.session_state.get("current_user", os.getenv("ADMIN_USER"))
rows = list_users(cnx, limit=1000, include_password=include_pw)
finally:
cnx.close()
df = pd.DataFrame(rows)
if not df.empty:
from datetime import date
df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date
# --- Ajout coloration et statut expiration ---
ALERTE_JOURS = 30
exp = pd.to_datetime(df["DateExpiration"], errors="coerce")
today = pd.Timestamp(date.today())
df["Jours_restant"] = (exp - today).dt.days
def statut_expiration(j):
if pd.isna(j):
return "Inconnu"
j = int(j)
if j < 0:
return "Périmé"
if j <= ALERTE_JOURS:
return f"BientĂŽt (â€{ALERTE_JOURS}j)"
return "OK"
df["Statut"] = df["Jours_restant"].apply(statut_expiration)
c1, c2, c3 = st.columns([1, 1, 3])
with c1:
only_bad = st.checkbox("đ PĂ©rimĂ©s / bientĂŽt", value=False)
with c2:
tri_jours = st.checkbox("đœ Trier par Jours restants", value=True)
if only_bad:
df = df[df["Statut"].isin(["PĂ©rimĂ©", f"BientĂŽt (â€{ALERTE_JOURS}j)"])]
if tri_jours and "Jours_restant" in df.columns:
df = df.sort_values("Jours_restant", na_position="last")
def colorize(row):
stt = row.get("Statut", "")
n = len(row)
if stt == "Périmé":
return ["background-color:#ffe6e6; color:#b00020;"] * n # rouge clair
if stt.startswith("BientĂŽt"):
return ["background-color:#fff7e6; color:#8a6d3b;"] * n # orange clair
return [""] * n
styled = df.style.apply(colorize, axis=1)
st.dataframe(
styled,
use_container_width=True,
hide_index=True,
height=700, # â affiche ~20 lignes sans scroller
column_config={
"DateExpiration": st.column_config.DateColumn("DateExpiration", format="YYYY-MM-DD"),
"Jours_restant": st.column_config.NumberColumn("Jours restants", help="Négatif = périmé"),
},
)
st.caption(f"{len(df)} utilisateur(s) affiché(s)")
else:
st.info("Aucun utilisateur Ă afficher.")
except Exception as e:
st.warning(f"Impossible de lister les utilisateurs : {e}")
# -----------------------
# ONGLET CREER
# -----------------------
with tab_create:
with st.form("create_user_form", clear_on_submit=False):
st.subheader("Nouveau compte")
c1, c2, c3 = st.columns([1.2, 1.5, 1])
username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier")
full_name = c2.text_input("Nom_complet", placeholder="Clément JAQUIER")
site = c3.text_input("Site", placeholder="Roissy", value="Roissy")
role = st.selectbox("RĂŽle", ["Utilisateur", "Administrateur"], index=0)
c4, c5, c6 = st.columns([1.4, 1, 1])
email = c4.text_input("email", placeholder="prenom.nom@domaine.com")
phone = c5.text_input("Téléphone", placeholder="06 12 12 35 32")
expires = c6.date_input("DateExpiration", value=date.today())
c7, c8 = st.columns(2)
password = c7.text_input("Mot de passe", type="password")
password2 = c8.text_input("Confirmer", type="password")
col_cb1, col_cb2 = st.columns([1.2, 1])
notify_welcome = col_cb2.checkbox("Envoyer un e-mail de bienvenue", value=True,
help="Enverra l'identifiant, le nom, le site et le mot de passe en clair")
submitted = st.form_submit_button("Créer l'utilisateur", use_container_width=True)
if submitted:
if not username or not full_name or not site or not email:
st.error("Champs requis manquants.")
elif not EMAIL_RE.match(email):
st.error("Format dâe-mail invalide.")
elif password != password2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
cnx = pool.get_connection()
try:
# đ avertir si l'e-mail existe dĂ©jĂ (mais on n'empĂȘche pas)
dup = find_users_by_email(cnx, email)
if dup:
liste = ", ".join(f"{u['NomUtilisateur']}@{u['Site']}" for u in dup)
st.info(f"Cet e-mail est déjà utilisé par : {liste}")
pwd_hash = insert_user(
cnx, username=username, full_name=full_name, site=site,
password=password, expires=expires, phone=phone, email=email, role=role
)
finally:
cnx.close()
st.success("Utilisateur créé avec succĂšs â
")
st.caption("Hash (MotDePasseHash) :")
st.code(pwd_hash)
# âïž Mail de bienvenue (optionnel)
if notify_welcome:
try:
subj = f"[Compte créé] Vos accĂšs â {site}"
body_txt = (
"Bonjour,\n\n"
"Votre compte a été créé.\n\n"
f"Nom dâutilisateur : {username}\n"
f"Nom complet : {full_name}\n"
f"Site : {site}\n"
f"Mot de passe : {password}\n"
f"Date d'expiration: {expires.strftime('%Y-%m-%d')}\n\n"
"Cordialement."
)
body_html = f"""
Bonjour,
Votre compte a été créé.
| Nom dâutilisateur | {username} |
| Nom complet | {full_name} |
| Site | {site} |
| Mot de passe | {password} |
| Date d'expiration | {expires.strftime('%Y-%m-%d')} |
Cordialement.
"""
send_mail(email, subj, body_txt, body_html)
st.success(f"âïž E-mail de bienvenue envoyĂ© Ă {email}")
except Exception as e_mail:
st.warning(f"E-mail non envoyé : {e_mail}")
except mysql.connector.Error as db_err:
if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
st.error("Identifiants MySQL invalides.")
else:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
# -----------------------
# ONGLET MODIFIER
# -----------------------
with tab_edit:
st.subheader("Modifier un utilisateur existant")
try:
cnx = pool.get_connection()
try:
users = list_users(cnx, limit=1000)
finally:
cnx.close()
usernames = [u["NomUtilisateur"] for u in users]
except Exception as e:
users = []
usernames = []
st.warning(f"Impossible de charger la liste des utilisateurs : {e}")
top_left, top_right = st.columns([1.2, 2])
with top_left:
sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur")
field = st.selectbox(
"Champ Ă modifier",
["Nom_complet", "Site", "DateExpiration", "Telephone", "email"]
)
with top_right:
if field == "DateExpiration":
new_value = st.date_input("Nouvelle valeur", value=date.today())
else:
new_value = st.text_input("Nouvelle valeur")
notify_user = st.checkbox("Notifier lâutilisateur par e-mail", value=True, help="Envoie un e-mail si cochĂ©")
update_btn = st.button("Mettre Ă jour", disabled=not sel_user, use_container_width=True)
if update_btn and sel_user:
try:
cnx = pool.get_connection()
try:
# 1) Lire email + ancienne valeur
to_email, old_value = get_user_email_and_field(cnx, sel_user, field)
# 2) Appliquer la mise Ă jour
update_field(cnx, sel_user, field, new_value)
finally:
cnx.close()
st.success(f"â
{field} mis Ă jour pour {sel_user}")
# 3) Notification mail si demandé
if notify_user:
try:
nv = new_value.strftime("%Y-%m-%d") if hasattr(new_value, "strftime") else new_value
ov = old_value.strftime("%Y-%m-%d") if hasattr(old_value, "strftime") else old_value
if not to_email:
st.info("âčïž Aucune notification envoyĂ©e : adresse e-mail manquante.")
else:
subject = f"[Compte] Mise Ă jour de votre information : {field}"
body_txt = (
f"Bonjour,\n\n"
f"Votre information '{field}' vient dâĂȘtre mise Ă jour par lâadministrateur.\n"
f"Ancienne valeur : {ov}\n"
f"Nouvelle valeur : {nv}\n\n"
f"Si vous nâĂȘtes pas Ă lâorigine de cette demande, rĂ©pondez Ă cet e-mail.\n"
f"Cordialement."
)
body_html = f"""
Bonjour,
Votre information {field} vient dâĂȘtre mise Ă jour par lâadministrateur.
- Ancienne valeur : {ov}
- Nouvelle valeur : {nv}
Si vous nâĂȘtes pas Ă lâorigine de cette demande, rĂ©pondez Ă cet e-mail.
Cordialement.
"""
send_mail(to_email, subject, body_txt, body_html)
st.success(f"âïž Notification envoyĂ©e Ă {to_email}")
except Exception as e_mail:
st.warning(f"Notification non envoyée : {e_mail}")
except mysql.connector.Error as db_err:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
# -----------------------
# ONGLET SECURITE
# -----------------------
with tab_security:
st.subheader("Réinitialiser le mot de passe (bcrypt)")
try:
if 'user_cache_for_pw' not in st.session_state:
cnx = pool.get_connection()
try:
st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)]
finally:
cnx.close()
pw_user_list = st.session_state.user_cache_for_pw
except Exception:
pw_user_list = []
user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur")
colp1, colp2 = st.columns(2)
new_pw = colp1.text_input("Nouveau mot de passe", type="password")
new_pw2 = colp2.text_input("Confirmer", type="password")
if st.button("Mettre Ă jour le mot de passe", disabled=not user_pw, use_container_width=True):
if not new_pw:
st.error("Mot de passe vide.")
elif new_pw != new_pw2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
cnx = pool.get_connection()
try:
h = update_password(cnx, user_pw, new_pw)
finally:
cnx.close()
st.success("Mot de passe mis Ă jour â
")
st.caption("Hash (MotDePasseHash) :")
st.code(h)
except Exception as e:
st.error(f"Erreur : {e}")