# app_users.py — création + modification de champs (wide + onglets + grille)
import os
import re
from datetime import date, datetime
import bcrypt
import mysql.connector
from mysql.connector import errorcode
import streamlit as st
from dotenv import load_dotenv
# -----------------------
# Auth minimale
# -----------------------
def require_login():
st.markdown(
"""
""",
unsafe_allow_html=True
)
load_dotenv()
admin_user = os.getenv("ADMIN_USER")
admin_hash = os.getenv("ADMIN_PASS_HASH")
if not admin_hash:
st.error("ADMIN_PASS_HASH manquant dans .env")
st.stop()
if "auth_ok" not in st.session_state:
st.session_state.auth_ok = False
if not st.session_state.auth_ok:
st.markdown("
🔐 Accès restreint
", unsafe_allow_html=True)
# conteneur centré
login_left, login_center, login_right = st.columns([1, 2, 1])
with login_center:
st.markdown(
"""
""",
unsafe_allow_html=True
)
u = st.text_input("Utilisateur", key="login_user")
p = st.text_input("Mot de passe", type="password", key="login_pass")
if st.button("Se connecter", use_container_width=True):
if u == admin_user and bcrypt.checkpw(p.encode(), admin_hash.encode()):
st.session_state.auth_ok = True
st.rerun()
else:
st.error("Identifiants invalides")
st.markdown("
", unsafe_allow_html=True)
st.stop()
require_login()
# -----------------------
# Connexion MySQL
# -----------------------
@st.cache_resource
def get_connection():
load_dotenv()
host = os.getenv("DB_HOST")
port = int(os.getenv("MYSQL_PORT", "3306"))
user = os.getenv("DB_USER")
pwd = os.getenv("DB_PASSWORD")
db = os.getenv("DB_NAME")
missing = [k for k, v in {
"MYSQL_USER": user, "MYSQL_PASSWORD": pwd, "MYSQL_HOST": host, "MYSQL_PORT": port, "MYSQL_DATABASE": db
}.items() if v in (None, "")]
if missing:
raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}")
return mysql.connector.connect(
host=host, port=port, user=user, password=pwd, database=db, autocommit=True
)
# -----------------------
# Utilitaires
# -----------------------
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def normalize_phone(phone: str | None) -> str | None:
if not phone:
return None
return re.sub(r"[^\d+]", "", phone)
def to_sql_date(d: date | str) -> str:
if isinstance(d, date):
return d.strftime("%Y-%m-%d")
for fmt in ("%Y-%m-%d", "%d/%m/%Y"):
try:
return datetime.strptime(d, fmt).strftime("%Y-%m-%d")
except ValueError:
continue
raise ValueError("Date invalide (attendu: YYYY-MM-DD ou JJ/MM/AAAA)")
def hash_password(plain: str, rounds: int = 12) -> str:
salt = bcrypt.gensalt(rounds=rounds)
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
def user_exists(cursor, username: str, email: str) -> bool:
cursor.execute(
"SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s OR email=%s",
(username, email),
)
(count,) = cursor.fetchone()
return count > 0
def insert_user(cnx, username, full_name, site, password, expires, phone, email, store_plain=False):
if not EMAIL_RE.match(email):
raise ValueError("Email invalide.")
phone_norm = normalize_phone(phone)
exp_sql = to_sql_date(expires)
pwd_hash = hash_password(password)
with cnx.cursor() as cur:
if user_exists(cur, username, email):
raise RuntimeError("Nom d'utilisateur ou email déjà existant.")
if store_plain:
cur.execute(
"""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
""",
(username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email),
)
else:
cur.execute(
"""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
""",
(username, full_name, site, None, pwd_hash, exp_sql, phone_norm, email),
)
return pwd_hash
def list_users(cnx, limit=200):
with cnx.cursor(dictionary=True) as cur:
cur.execute(
"""
SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
FROM Utilisateurs
ORDER BY NomUtilisateur ASC
LIMIT %s
""",
(limit,),
)
return cur.fetchall()
def get_user(cnx, username: str):
with cnx.cursor(dictionary=True) as cur:
cur.execute(
"""
SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
FROM Utilisateurs
WHERE NomUtilisateur=%s
""",
(username,),
)
return cur.fetchone()
def update_field(cnx, username: str, field: str, value):
allowed = {
"Nom_complet": "Nom_complet",
"Site": "Site",
"Telephone": "Telephone",
"DateExpiration": "DateExpiration",
"email": "email",
}
if field not in allowed:
raise ValueError("Champ non autorisé.")
sql_field = allowed[field]
if field == "email":
if not EMAIL_RE.match(str(value)):
raise ValueError("Email invalide.")
if field == "Telephone":
value = normalize_phone(str(value)) if value else None
if field == "DateExpiration":
value = to_sql_date(value)
with cnx.cursor() as cur:
cur.execute(
f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s",
(value, username),
)
def update_password(cnx, username: str, new_password: str):
pwd_hash = hash_password(new_password)
with cnx.cursor() as cur:
cur.execute(
"UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s",
(pwd_hash, username),
)
return pwd_hash
# -----------------------
# UI
# -----------------------
st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="wide")
# CSS doux
st.markdown(
"""
""",
unsafe_allow_html=True
)
st.title("👤 Acces.Utilisateurs")
# Connexion DB (badge)
try:
cnx = get_connection()
st.caption("Connexion MySQL via `.env` : **OK** ✅")
except Exception as e:
st.error(f"Connexion MySQL impossible : {e}")
st.stop()
# Bouton Sortie / Exit
hdr_l, hdr_r = st.columns([1, 0.18])
with hdr_r:
if st.button("🔒 SORTIE / EXIT", use_container_width=True, help="Se déconnecter et verrouiller l'accès"):
st.session_state.auth_ok = False
st.rerun()
# -----------------------
# Onglets
# -----------------------
tab_list, tab_create, tab_edit, tab_security = st.tabs(
["📋 Liste", "🆕 Créer", "✏️ Modifier", "🔐 Sécurité"]
)
# --- Onglet Liste ---
with tab_list:
st.subheader("Utilisateurs récents")
try:
import pandas as pd
rows = list_users(cnx, limit=1000)
df = pd.DataFrame(rows)
if not df.empty:
df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date
from datetime import date as _date
today = _date.today()
expired_mask = df["DateExpiration"] <= today
col_a, col_b = st.columns(2)
col_a.metric("Total utilisateurs", len(df))
col_b.metric("Comptes périmés", int(expired_mask.sum()))
def style_expired(row):
if row["DateExpiration"] <= today:
return ["color: white; background-color: #d32f2f;"] * len(row)
return [""] * len(row)
styled = df.style.apply(style_expired, axis=1)
visible_rows = len(df)
row_px = 36
header_px = 48
padding_px = 24
height = min(1000, header_px + row_px * visible_rows + padding_px)
st.dataframe(
styled,
use_container_width=True,
hide_index=True,
height=height,
)
else:
st.info("Aucun utilisateur à afficher.")
except Exception as e:
st.warning(f"Impossible de lister les utilisateurs : {e}")
# --- Onglet Créer ---
with tab_create:
with st.form("create_user_form", clear_on_submit=False):
st.subheader("Nouveau compte")
c1, c2, c3 = st.columns([1.2, 1.5, 1])
username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier")
full_name = c2.text_input("Nom_complet", placeholder="Clément JAQUIER")
site = c3.text_input("Site", placeholder="Roissy", value="Roissy")
c4, c5, c6 = st.columns([1.4, 1, 1])
email = c4.text_input("email", placeholder="prenom.nom@domaine.com")
phone = c5.text_input("Téléphone", placeholder="06 12 12 35 32")
expires = c6.date_input("DateExpiration", value=date.today())
c7, c8 = st.columns(2)
password = c7.text_input("Mot de passe (saisie)", type="password")
confirm = c8.text_input("Confirmer mot de passe", type="password")
store_plain = st.checkbox("Remplir aussi MotDePasse en clair (déconseillé)", value=False)
submitted = st.form_submit_button("Créer l’utilisateur", use_container_width=True)
if submitted:
if not username or not full_name or not site or not email or not password:
st.error("Merci de remplir tous les champs obligatoires.")
elif password != confirm:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
pwd_hash = insert_user(
cnx,
username=username.strip(),
full_name=full_name.strip(),
site=site.strip(),
password=password,
expires=expires,
phone=phone.strip() if phone else None,
email=email.strip(),
store_plain=store_plain,
)
st.success("Utilisateur créé avec succès ✅")
with st.expander("Voir le hash généré (MotDePasseHash)"):
st.code(pwd_hash)
except mysql.connector.Error as db_err:
if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
st.error("Identifiants MySQL invalides.")
else:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
# --- Onglet Modifier ---
with tab_edit:
st.subheader("Modifier un utilisateur existant")
try:
users = list_users(cnx, limit=1000)
usernames = [u["NomUtilisateur"] for u in users]
except Exception as e:
users = []
usernames = []
st.warning(f"Impossible de charger la liste des utilisateurs : {e}")
top_left, top_right = st.columns([1.2, 2])
with top_left:
sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur")
field = st.selectbox(
"Champ à modifier",
["Nom_complet", "Site", "Telephone", "DateExpiration", "email"]
)
if field == "DateExpiration":
new_value = st.date_input("Nouvelle valeur (date)", value=date.today(), key="new_date")
elif field == "Telephone":
new_value = st.text_input("Nouvelle valeur (téléphone)", key="new_tel")
else:
new_value = st.text_input("Nouvelle valeur", key="new_text")
update_btn = st.button(
"Mettre à jour le champ",
type="primary",
use_container_width=True,
disabled=not sel_user,
)
with top_right:
if sel_user:
details = get_user(cnx, sel_user)
if details:
c1, c2, c3, c4, c5 = st.columns(5, gap="small")
c1.markdown(f"Nom
{details['Nom_complet'] or ''}
", unsafe_allow_html=True)
c2.markdown(f"Site
{details['Site'] or ''}
", unsafe_allow_html=True)
c3.markdown(f"Expire
{details['DateExpiration'] or ''}
", unsafe_allow_html=True)
c4.markdown(f"Tél
{details['Telephone'] or ''}
", unsafe_allow_html=True)
c5.markdown(f"Email
{details['email'] or ''}
", unsafe_allow_html=True)
if update_btn and sel_user:
try:
update_field(cnx, sel_user, field, new_value)
st.success(f"✅ {field} mis à jour pour {sel_user}")
except mysql.connector.Error as db_err:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
# --- Onglet Sécurité ---
with tab_security:
st.subheader("Réinitialiser le mot de passe (bcrypt)")
try:
if 'user_cache_for_pw' not in st.session_state:
st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)]
pw_user_list = st.session_state.user_cache_for_pw
except Exception:
pw_user_list = []
user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur")
colp1, colp2 = st.columns(2)
new_pw = colp1.text_input("Nouveau mot de passe", type="password")
new_pw2 = colp2.text_input("Confirmer", type="password")
if st.button("Mettre à jour le mot de passe", disabled=not user_pw, use_container_width=True):
if not new_pw:
st.error("Mot de passe vide.")
elif new_pw != new_pw2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
h = update_password(cnx, user_pw, new_pw)
st.success("Mot de passe mis à jour ✅")
st.caption("Hash (MotDePasseHash) :")
st.code(h)
except Exception as e:
st.error(f"Erreur : {e}")